Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client struct: lazy init components and optimize struct layout #1405

Merged
merged 7 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {

if (getClientType(c) == CLIENT_TYPE_PUBSUB) {
/* Check for pattern violations. */
dictIterator *di = dictGetIterator(c->pubsub_patterns);
dictIterator *di = dictGetIterator(c->pubsub_data->pubsub_patterns);
dictEntry *de;
while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
Expand All @@ -1967,7 +1967,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
/* Check for channel violations. */
if (!kill) {
/* Check for global channels violation. */
di = dictGetIterator(c->pubsub_channels);
di = dictGetIterator(c->pubsub_data->pubsub_channels);

while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
Expand All @@ -1978,7 +1978,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
}
if (!kill) {
/* Check for shard channels violation. */
di = dictGetIterator(c->pubsubshard_channels);
di = dictGetIterator(c->pubsub_data->pubsubshard_channels);
while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
Expand Down
3 changes: 2 additions & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,8 @@ struct client *createAOFClient(void) {

/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
initClientReplicationData(c);
c->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
return c;
}

Expand Down
127 changes: 71 additions & 56 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,25 @@ static void moduleUnblockClientOnKey(client *c, robj *key);
static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);

void initClientBlockingState(client *c) {
c->bstate.btype = BLOCKED_NONE;
c->bstate.timeout = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate.numreplicas = 0;
c->bstate.numlocal = 0;
c->bstate.reploffset = 0;
c->bstate.generic_blocked_list_node = NULL;
c->bstate.module_blocked_handle = NULL;
c->bstate.async_rm_call_handle = NULL;
if (c->bstate) return;
c->bstate = zmalloc(sizeof(blockingState));
c->bstate->btype = BLOCKED_NONE;
c->bstate->timeout = 0;
c->bstate->unblock_on_nokey = 0;
c->bstate->keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate->numreplicas = 0;
c->bstate->numlocal = 0;
c->bstate->reploffset = 0;
c->bstate->generic_blocked_list_node = NULL;
c->bstate->module_blocked_handle = NULL;
c->bstate->async_rm_call_handle = NULL;
}

void freeClientBlockingState(client *c) {
if (!c->bstate) return;
dictRelease(c->bstate->keys);
zfree(c->bstate);
c->bstate = NULL;
}

/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
Expand All @@ -94,8 +103,10 @@ void blockClient(client *c, int btype) {
/* Primary client should never be blocked unless pause or module */
serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

initClientBlockingState(c);

c->flag.blocked = 1;
c->bstate.btype = btype;
c->bstate->btype = btype;
if (!c->flag.module)
server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[btype]++;
Expand Down Expand Up @@ -199,26 +210,26 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
if (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET || c->bstate->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT) {
} else if (c->bstate->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
} else if (c->bstate->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else if (c->bstate.btype == BLOCKED_POSTPONE) {
serverAssert(c->bstate.postponed_list_node);
listDelNode(server.postponed_clients, c->bstate.postponed_list_node);
c->bstate.postponed_list_node = NULL;
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
} else if (c->bstate->btype == BLOCKED_POSTPONE) {
serverAssert(c->bstate->postponed_list_node);
listDelNode(server.postponed_clients, c->bstate->postponed_list_node);
c->bstate->postponed_list_node = NULL;
} else if (c->bstate->btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
}

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!c->flag.pending_command && c->bstate->btype != BLOCKED_SHUTDOWN) {
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
Expand All @@ -229,12 +240,12 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* We count blocked client stats on regular clients and not on module clients */
if (!c->flag.module) server.blocked_clients--;
server.blocked_clients_by_type[c->bstate.btype]--;
server.blocked_clients_by_type[c->bstate->btype]--;
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
c->flag.blocked = 0;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
c->bstate->btype = BLOCKED_NONE;
c->bstate->unblock_on_nokey = 0;
removeClientFromTimeoutTable(c);
if (queue_for_reprocessing) queueClientForReprocessing(c);
}
Expand All @@ -243,22 +254,22 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
if (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET || c->bstate->btype == BLOCKED_STREAM) {
addReplyNullArray(c);
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
} else if (c->bstate->btype == BLOCKED_WAIT) {
if (c->cmd->proc == waitCommand) {
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate->reploffset));
} else if (c->cmd->proc == waitaofCommand) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
addReplyLongLong(c, server.fsynced_reploff >= c->bstate->reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate->reploffset));
} else if (c->cmd->proc == clusterCommand) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name);
}
} else if (c->bstate.btype == BLOCKED_MODULE) {
} else if (c->bstate->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
Expand All @@ -274,7 +285,7 @@ void replyToClientsBlockedOnShutdown(void) {
listRewind(server.clients, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flag.blocked && c->bstate.btype == BLOCKED_SHUTDOWN) {
if (c->flag.blocked && c->bstate->btype == BLOCKED_SHUTDOWN) {
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
unblockClient(c, 1);
}
Expand All @@ -301,7 +312,7 @@ void disconnectAllBlockedClients(void) {
* command processing will start from scratch, and the command will
* be either executed or rejected. (unlike LIST blocked clients for
* which the command is already in progress in a way. */
if (c->bstate.btype == BLOCKED_POSTPONE) continue;
if (c->bstate->btype == BLOCKED_POSTPONE) continue;

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
Expand Down Expand Up @@ -386,15 +397,17 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;

initClientBlockingState(c);

if (!c->flag.reprocessing_command) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
c->bstate->timeout = timeout;
}

for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys, keys[j], NULL))) {
if (!(client_blocked_entry = dictAddRaw(c->bstate->keys, keys[j], NULL))) {
continue;
}
incrRefCount(keys[j]);
Expand All @@ -411,7 +424,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
l = dictGetVal(db_blocked_existing_entry);
}
listAddNodeTail(l, c);
dictSetVal(c->bstate.keys, client_blocked_entry, listLast(l));
dictSetVal(c->bstate->keys, client_blocked_entry, listLast(l));

/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
Expand All @@ -425,7 +438,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
}
}
}
c->bstate.unblock_on_nokey = unblock_on_nokey;
c->bstate->unblock_on_nokey = unblock_on_nokey;
/* Currently we assume key blocking will require reprocessing the command.
* However in case of modules, they have a different way to handle the reprocessing
* which does not require setting the pending command flag */
Expand All @@ -439,15 +452,15 @@ static void unblockClientWaitingData(client *c) {
dictEntry *de;
dictIterator *di;

if (dictSize(c->bstate.keys) == 0) return;
if (dictSize(c->bstate->keys) == 0) return;

di = dictGetIterator(c->bstate.keys);
di = dictGetIterator(c->bstate->keys);
/* The client may wait for multiple keys, so unblock it for every key. */
while ((de = dictNext(di)) != NULL) {
releaseBlockedEntry(c, de, 0);
}
dictReleaseIterator(di);
dictEmpty(c->bstate.keys, NULL);
dictEmpty(c->bstate->keys, NULL);
}

static blocking_type getBlockedTypeByType(int type) {
Expand Down Expand Up @@ -546,7 +559,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
if (listLength(l) == 0) {
dictDelete(c->db->blocking_keys, key);
dictDelete(c->db->blocking_keys_unblock_on_nokey, key);
} else if (c->bstate.unblock_on_nokey) {
} else if (c->bstate->unblock_on_nokey) {
unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey, key);
/* it is not possible to have a client blocked on nokey with no matching entry */
serverAssertWithInfo(c, key, unblock_on_nokey_entry != NULL);
Expand All @@ -555,7 +568,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
dictDelete(c->db->blocking_keys_unblock_on_nokey, key);
}
}
if (remove_key) dictDelete(c->bstate.keys, key);
if (remove_key) dictDelete(c->bstate->keys, key);
}

void signalKeyAsReady(serverDb *db, robj *key, int type) {
Expand Down Expand Up @@ -593,9 +606,9 @@ static void handleClientsBlockedOnKey(readyList *rl) {
* module is trying to accomplish right now.
* 3. In case of XREADGROUP call we will want to unblock on any change in object type
* or in case the key was deleted, since the group is no longer valid. */
if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) ||
(o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || (receiver->bstate.unblock_on_nokey)) {
if (receiver->bstate.btype != BLOCKED_MODULE)
if ((o != NULL && (receiver->bstate->btype == getBlockedTypeByType(o->type))) ||
(o != NULL && (receiver->bstate->btype == BLOCKED_MODULE)) || (receiver->bstate->unblock_on_nokey)) {
if (receiver->bstate->btype != BLOCKED_MODULE)
unblockClientOnKey(receiver, rl->key);
else
moduleUnblockClientOnKey(receiver, rl->key);
Expand All @@ -606,28 +619,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {

/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
initClientBlockingState(c);
c->bstate->timeout = timeout;
c->bstate->reploffset = offset;
c->bstate->numreplicas = numreplicas;
c->bstate->numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
/* Note that we remember the linked list node where the client is stored,
* this way removing the client in unblockClientWaitingReplicas() will not
* require a linear scan, but just a constant time operation. */
serverAssert(c->bstate.client_waiting_acks_list_node == NULL);
c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
serverAssert(c->bstate->client_waiting_acks_list_node == NULL);
c->bstate->client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
blockClient(c, BLOCKED_WAIT);
}

/* Postpone client from executing a command. For example the server might be busy
* requesting to avoid processing clients commands which will be processed later
* when the it is ready to accept them. */
void blockPostponeClient(client *c) {
c->bstate.timeout = 0;
initClientBlockingState(c);
c->bstate->timeout = 0;
blockClient(c, BLOCKED_POSTPONE);
listAddNodeTail(server.postponed_clients, c);
serverAssert(c->bstate.postponed_list_node == NULL);
c->bstate.postponed_list_node = listLast(server.postponed_clients);
serverAssert(c->bstate->postponed_list_node == NULL);
c->bstate->postponed_list_node = listLast(server.postponed_clients);
/* Mark this client to execute its command */
c->flag.pending_command = 1;
}
Expand All @@ -644,13 +659,13 @@ void blockClientShutdown(client *c) {
static void unblockClientOnKey(client *c, robj *key) {
dictEntry *de;

de = dictFind(c->bstate.keys, key);
de = dictFind(c->bstate->keys, key);
releaseBlockedEntry(c, de, 1);

/* Only in case of blocking API calls, we might be blocked on several keys.
however we should force unblock the entire blocking keys */
serverAssert(c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET);
serverAssert(c->bstate->btype == BLOCKED_STREAM || c->bstate->btype == BLOCKED_LIST ||
c->bstate->btype == BLOCKED_ZSET);

/* We need to unblock the client before calling processCommandAndResetClient
* because it checks the CLIENT_BLOCKED flag */
Expand Down Expand Up @@ -712,7 +727,7 @@ static void moduleUnblockClientOnKey(client *c, robj *key) {
* command with timeout reply. */
void unblockClientOnTimeout(client *c) {
/* The client has been unlocked (in the moduleUnblocked list), return ASAP. */
if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;
if (c->bstate->btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;

replyToBlockedClientTimedOut(c);
if (c->flag.pending_command) c->flag.pending_command = 0;
Expand Down
14 changes: 7 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!c->flag.multi) return myself;
ms = &c->mstate;
ms = c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
Expand All @@ -1023,7 +1023,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
int pubsubshard_included =
(cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB));
(cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB));

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
Expand Down Expand Up @@ -1176,7 +1176,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* node is a replica and the request is about a hash slot our primary
* is serving, we can reply without redirection. */
int is_write_command =
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE));
if ((c->flag.readonly || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == n) {
return myself;
Expand Down Expand Up @@ -1233,14 +1233,14 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
clusterNode *myself = getMyClusterNode();
if (c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
if (c->flag.blocked && (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET ||
c->bstate->btype == BLOCKED_STREAM || c->bstate->btype == BLOCKED_MODULE)) {
dictEntry *de;
dictIterator *di;

/* If the client is blocked on module, but not on a specific key,
* don't unblock it. */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0;
if (c->bstate->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0;

/* If the cluster is down, unblock the client with the right error.
* If the cluster is configured to allow reads on cluster down, we
Expand All @@ -1252,7 +1252,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
}

/* All keys must belong to the same slot, so check first key only. */
di = dictGetIterator(c->bstate.keys);
di = dictGetIterator(c->bstate->keys);
if ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char *)key->ptr, sdslen(key->ptr));
Expand Down
Loading
Loading