Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call makeObjectShared to normalize a shared object into a true shared object #1566

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 109 additions & 109 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1964,153 +1964,153 @@ void afterSleep(struct aeEventLoop *eventLoop, int numevents) {
* initializations can be moved back inside createSharedObjects() below. */
void createSharedObjectsWithCompat(void) {
const char *name = server.extended_redis_compat ? "Redis" : SERVER_TITLE;
if (shared.loadingerr) decrRefCount(shared.loadingerr);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling with this, not sure how to handle it yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little sad that we need to store two versions of these strings and need to check the compat flag every time we return these strings. We have already planned to remove this compat config at some time.

If we can make extended-redis-compat a readonly config, we can initialize them only once, but that's not perfect. One of the purposes is that users can change them in runtime if they have some problem.

Another idea: These strings are changed only when the compat config is changed. Is it possible to wait for all threads to stop and then replace the shared objects and free the old ones? We do something similar for MODULE UNLOAD to make sure no module is accessing a command that is going to be removed:

void moduleUnregisterCommands(struct ValkeyModule *module) {
    /* Drain IO queue before modifying commands dictionary to prevent concurrent access while modifying it. */
    drainIOThreadsQueue();

WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazyfree job does not fit in the I/O threads, unforunately

shared.loadingerr =
createObject(OBJ_STRING, sdscatfmt(sdsempty(), "-LOADING %s is loading the dataset in memory\r\n", name));
if (shared.slowevalerr) decrRefCount(shared.slowevalerr);
shared.slowevalerr = createObject(
shared.loadingerr = makeObjectShared(createObject(
OBJ_STRING, sdscatfmt(sdsempty(), "-LOADING %s is loading the dataset in memory\r\n", name)));
shared.slowevalerr = makeObjectShared(createObject(
OBJ_STRING,
sdscatfmt(sdsempty(),
"-BUSY %s is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n", name));
if (shared.slowscripterr) decrRefCount(shared.slowscripterr);
shared.slowscripterr = createObject(
"-BUSY %s is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n", name)));
shared.slowscripterr = makeObjectShared(createObject(
OBJ_STRING,
sdscatfmt(sdsempty(),
"-BUSY %s is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n", name));
if (shared.slowmoduleerr) decrRefCount(shared.slowmoduleerr);
shared.slowmoduleerr =
createObject(OBJ_STRING, sdscatfmt(sdsempty(), "-BUSY %s is busy running a module command.\r\n", name));
if (shared.bgsaveerr) decrRefCount(shared.bgsaveerr);
shared.bgsaveerr =
"-BUSY %s is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n", name)));
shared.slowmoduleerr = makeObjectShared(
createObject(OBJ_STRING, sdscatfmt(sdsempty(), "-BUSY %s is busy running a module command.\r\n", name)));
shared.bgsaveerr = makeObjectShared(
createObject(OBJ_STRING, sdscatfmt(sdsempty(),
"-MISCONF %s is configured to save RDB snapshots, but it's currently"
" unable to persist to disk. Commands that may modify the data set are"
" disabled, because this instance is configured to report errors during"
" writes if RDB snapshotting fails (stop-writes-on-bgsave-error option)."
" Please check the %s logs for details about the RDB error.\r\n",
name, name));
name, name)));
}

void createSharedObjects(void) {
int j;

/* Shared command responses */
shared.ok = createObject(OBJ_STRING, sdsnew("+OK\r\n"));
shared.emptybulk = createObject(OBJ_STRING, sdsnew("$0\r\n\r\n"));
shared.czero = createObject(OBJ_STRING, sdsnew(":0\r\n"));
shared.cone = createObject(OBJ_STRING, sdsnew(":1\r\n"));
shared.emptyarray = createObject(OBJ_STRING, sdsnew("*0\r\n"));
shared.pong = createObject(OBJ_STRING, sdsnew("+PONG\r\n"));
shared.queued = createObject(OBJ_STRING, sdsnew("+QUEUED\r\n"));
shared.emptyscan = createObject(OBJ_STRING, sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
shared.space = createObject(OBJ_STRING, sdsnew(" "));
shared.plus = createObject(OBJ_STRING, sdsnew("+"));
shared.ok = makeObjectShared(createObject(OBJ_STRING, sdsnew("+OK\r\n")));
shared.emptybulk = makeObjectShared(createObject(OBJ_STRING, sdsnew("$0\r\n\r\n")));
shared.czero = makeObjectShared(createObject(OBJ_STRING, sdsnew(":0\r\n")));
shared.cone = makeObjectShared(createObject(OBJ_STRING, sdsnew(":1\r\n")));
shared.emptyarray = makeObjectShared(createObject(OBJ_STRING, sdsnew("*0\r\n")));
shared.pong = makeObjectShared(createObject(OBJ_STRING, sdsnew("+PONG\r\n")));
shared.queued = makeObjectShared(createObject(OBJ_STRING, sdsnew("+QUEUED\r\n")));
shared.emptyscan = makeObjectShared(createObject(OBJ_STRING, sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")));
shared.space = makeObjectShared(createObject(OBJ_STRING, sdsnew(" ")));
shared.plus = makeObjectShared(createObject(OBJ_STRING, sdsnew("+")));

/* Shared command error responses */
shared.wrongtypeerr =
createObject(OBJ_STRING, sdsnew("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
shared.err = createObject(OBJ_STRING, sdsnew("-ERR\r\n"));
shared.nokeyerr = createObject(OBJ_STRING, sdsnew("-ERR no such key\r\n"));
shared.syntaxerr = createObject(OBJ_STRING, sdsnew("-ERR syntax error\r\n"));
shared.sameobjecterr = createObject(OBJ_STRING, sdsnew("-ERR source and destination objects are the same\r\n"));
shared.outofrangeerr = createObject(OBJ_STRING, sdsnew("-ERR index out of range\r\n"));
shared.noscripterr = createObject(OBJ_STRING, sdsnew("-NOSCRIPT No matching script.\r\n"));
shared.wrongtypeerr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")));
shared.err = makeObjectShared(createObject(OBJ_STRING, sdsnew("-ERR\r\n")));
shared.nokeyerr = makeObjectShared(createObject(OBJ_STRING, sdsnew("-ERR no such key\r\n")));
shared.syntaxerr = makeObjectShared(createObject(OBJ_STRING, sdsnew("-ERR syntax error\r\n")));
shared.sameobjecterr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-ERR source and destination objects are the same\r\n")));
shared.outofrangeerr = makeObjectShared(createObject(OBJ_STRING, sdsnew("-ERR index out of range\r\n")));
shared.noscripterr = makeObjectShared(createObject(OBJ_STRING, sdsnew("-NOSCRIPT No matching script.\r\n")));
createSharedObjectsWithCompat();
shared.primarydownerr = createObject(
OBJ_STRING, sdsnew("-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
shared.roreplicaerr =
createObject(OBJ_STRING, sdsnew("-READONLY You can't write against a read only replica.\r\n"));
shared.noautherr = createObject(OBJ_STRING, sdsnew("-NOAUTH Authentication required.\r\n"));
shared.oomerr = createObject(OBJ_STRING, sdsnew("-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
shared.execaborterr =
createObject(OBJ_STRING, sdsnew("-EXECABORT Transaction discarded because of previous errors.\r\n"));
shared.noreplicaserr = createObject(OBJ_STRING, sdsnew("-NOREPLICAS Not enough good replicas to write.\r\n"));
shared.busykeyerr = createObject(OBJ_STRING, sdsnew("-BUSYKEY Target key name already exists.\r\n"));
shared.primarydownerr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n")));
shared.roreplicaerr = makeObjectShared((createObject(
OBJ_STRING, sdsnew("-READONLY You can't write against a read only replica.\r\n"))));
shared.noautherr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-NOAUTH Authentication required.\r\n")));
shared.oomerr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-OOM command not allowed when used memory > 'maxmemory'.\r\n")));
shared.execaborterr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-EXECABORT Transaction discarded because of previous errors.\r\n")));
shared.noreplicaserr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-NOREPLICAS Not enough good replicas to write.\r\n")));
shared.busykeyerr = makeObjectShared(createObject(
OBJ_STRING, sdsnew("-BUSYKEY Target key name already exists.\r\n")));

/* The shared NULL depends on the protocol version. */
shared.null[0] = NULL;
shared.null[1] = NULL;
shared.null[2] = createObject(OBJ_STRING, sdsnew("$-1\r\n"));
shared.null[3] = createObject(OBJ_STRING, sdsnew("_\r\n"));
shared.null[2] = makeObjectShared(createObject(OBJ_STRING, sdsnew("$-1\r\n")));
shared.null[3] = makeObjectShared(createObject(OBJ_STRING, sdsnew("_\r\n")));

shared.nullarray[0] = NULL;
shared.nullarray[1] = NULL;
shared.nullarray[2] = createObject(OBJ_STRING, sdsnew("*-1\r\n"));
shared.nullarray[3] = createObject(OBJ_STRING, sdsnew("_\r\n"));
shared.nullarray[2] = makeObjectShared(createObject(OBJ_STRING, sdsnew("*-1\r\n")));
shared.nullarray[3] = makeObjectShared(createObject(OBJ_STRING, sdsnew("_\r\n")));

shared.emptymap[0] = NULL;
shared.emptymap[1] = NULL;
shared.emptymap[2] = createObject(OBJ_STRING, sdsnew("*0\r\n"));
shared.emptymap[3] = createObject(OBJ_STRING, sdsnew("%0\r\n"));
shared.emptymap[2] = makeObjectShared(createObject(OBJ_STRING, sdsnew("*0\r\n")));
shared.emptymap[3] = makeObjectShared(createObject(OBJ_STRING, sdsnew("%0\r\n")));

shared.emptyset[0] = NULL;
shared.emptyset[1] = NULL;
shared.emptyset[2] = createObject(OBJ_STRING, sdsnew("*0\r\n"));
shared.emptyset[3] = createObject(OBJ_STRING, sdsnew("~0\r\n"));
shared.emptyset[2] = makeObjectShared(createObject(OBJ_STRING, sdsnew("*0\r\n")));
shared.emptyset[3] = makeObjectShared(createObject(OBJ_STRING, sdsnew("~0\r\n")));

for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
char dictid_str[64];
int dictid_len;

dictid_len = ll2string(dictid_str, sizeof(dictid_str), j);
shared.select[j] = createObject(
OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, dictid_str));
}
shared.messagebulk = createStringObject("$7\r\nmessage\r\n", 13);
shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n", 14);
shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n", 15);
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n", 18);
shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n", 17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n", 19);
shared.select[j] = makeObjectShared(createObject(
OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, dictid_str)));
}
shared.messagebulk = makeObjectShared(createStringObject("$7\r\nmessage\r\n", 13));
shared.pmessagebulk = makeObjectShared(createStringObject("$8\r\npmessage\r\n", 14));
shared.subscribebulk = makeObjectShared(createStringObject("$9\r\nsubscribe\r\n", 15));
shared.unsubscribebulk = makeObjectShared(createStringObject("$11\r\nunsubscribe\r\n", 18));
shared.ssubscribebulk = makeObjectShared(createStringObject("$10\r\nssubscribe\r\n", 17));
shared.sunsubscribebulk = makeObjectShared(createStringObject("$12\r\nsunsubscribe\r\n", 19));
shared.smessagebulk = makeObjectShared(createStringObject("$8\r\nsmessage\r\n", 14));
shared.psubscribebulk = makeObjectShared(createStringObject("$10\r\npsubscribe\r\n", 17));
shared.punsubscribebulk = makeObjectShared(createStringObject("$12\r\npunsubscribe\r\n", 19));

/* Shared command names */
shared.del = createStringObject("DEL", 3);
shared.unlink = createStringObject("UNLINK", 6);
shared.rpop = createStringObject("RPOP", 4);
shared.lpop = createStringObject("LPOP", 4);
shared.lpush = createStringObject("LPUSH", 5);
shared.rpoplpush = createStringObject("RPOPLPUSH", 9);
shared.lmove = createStringObject("LMOVE", 5);
shared.blmove = createStringObject("BLMOVE", 6);
shared.zpopmin = createStringObject("ZPOPMIN", 7);
shared.zpopmax = createStringObject("ZPOPMAX", 7);
shared.multi = createStringObject("MULTI", 5);
shared.exec = createStringObject("EXEC", 4);
shared.hset = createStringObject("HSET", 4);
shared.srem = createStringObject("SREM", 4);
shared.xgroup = createStringObject("XGROUP", 6);
shared.xclaim = createStringObject("XCLAIM", 6);
shared.script = createStringObject("SCRIPT", 6);
shared.replconf = createStringObject("REPLCONF", 8);
shared.pexpireat = createStringObject("PEXPIREAT", 9);
shared.pexpire = createStringObject("PEXPIRE", 7);
shared.persist = createStringObject("PERSIST", 7);
shared.set = createStringObject("SET", 3);
shared.eval = createStringObject("EVAL", 4);
shared.del = makeObjectShared(createStringObject("DEL", 3));
shared.unlink = makeObjectShared(createStringObject("UNLINK", 6));
shared.rpop = makeObjectShared(createStringObject("RPOP", 4));
shared.lpop = makeObjectShared(createStringObject("LPOP", 4));
shared.lpush = makeObjectShared(createStringObject("LPUSH", 5));
shared.rpoplpush = makeObjectShared(createStringObject("RPOPLPUSH", 9));
shared.lmove = makeObjectShared(createStringObject("LMOVE", 5));
shared.blmove = makeObjectShared(createStringObject("BLMOVE", 6));
shared.zpopmin = makeObjectShared(createStringObject("ZPOPMIN", 7));
shared.zpopmax = makeObjectShared(createStringObject("ZPOPMAX", 7));
shared.multi = makeObjectShared(createStringObject("MULTI", 5));
shared.exec = makeObjectShared(createStringObject("EXEC", 4));
shared.hset = makeObjectShared(createStringObject("HSET", 4));
shared.srem = makeObjectShared(createStringObject("SREM", 4));
shared.xgroup = makeObjectShared(createStringObject("XGROUP", 6));
shared.xclaim = makeObjectShared(createStringObject("XCLAIM", 6));
shared.script = makeObjectShared(createStringObject("SCRIPT", 6));
shared.replconf = makeObjectShared(createStringObject("REPLCONF", 8));
shared.pexpireat = makeObjectShared(createStringObject("PEXPIREAT", 9));
shared.pexpire = makeObjectShared(createStringObject("PEXPIRE", 7));
shared.persist = makeObjectShared(createStringObject("PERSIST", 7));
shared.set = makeObjectShared(createStringObject("SET", 3));
shared.eval = makeObjectShared(createStringObject("EVAL", 4));

/* Shared command argument */
shared.left = createStringObject("left", 4);
shared.right = createStringObject("right", 5);
shared.pxat = createStringObject("PXAT", 4);
shared.time = createStringObject("TIME", 4);
shared.retrycount = createStringObject("RETRYCOUNT", 10);
shared.force = createStringObject("FORCE", 5);
shared.justid = createStringObject("JUSTID", 6);
shared.entriesread = createStringObject("ENTRIESREAD", 11);
shared.lastid = createStringObject("LASTID", 6);
shared.default_username = createStringObject("default", 7);
shared.ping = createStringObject("ping", 4);
shared.setid = createStringObject("SETID", 5);
shared.keepttl = createStringObject("KEEPTTL", 7);
shared.absttl = createStringObject("ABSTTL", 6);
shared.load = createStringObject("LOAD", 4);
shared.createconsumer = createStringObject("CREATECONSUMER", 14);
shared.getack = createStringObject("GETACK", 6);
shared.special_asterisk = createStringObject("*", 1);
shared.special_equals = createStringObject("=", 1);
shared.left = makeObjectShared(createStringObject("left", 4));
shared.right = makeObjectShared(createStringObject("right", 5));
shared.pxat = makeObjectShared(createStringObject("PXAT", 4));
shared.time = makeObjectShared(createStringObject("TIME", 4));
shared.retrycount = makeObjectShared(createStringObject("RETRYCOUNT", 10));
shared.force = makeObjectShared(createStringObject("FORCE", 5));
shared.justid = makeObjectShared(createStringObject("JUSTID", 6));
shared.entriesread = makeObjectShared(createStringObject("ENTRIESREAD", 11));
shared.lastid = makeObjectShared(createStringObject("LASTID", 6));
shared.default_username = makeObjectShared(createStringObject("default", 7));
shared.ping = makeObjectShared(createStringObject("ping", 4));
shared.setid = makeObjectShared(createStringObject("SETID", 5));
shared.keepttl = makeObjectShared(createStringObject("KEEPTTL", 7));
shared.absttl = makeObjectShared(createStringObject("ABSTTL", 6));
shared.load = makeObjectShared(createStringObject("LOAD", 4));
shared.createconsumer = makeObjectShared(createStringObject("CREATECONSUMER", 14));
shared.getack = makeObjectShared(createStringObject("GETACK", 6));
shared.special_asterisk = makeObjectShared(createStringObject("*", 1));
shared.special_equals = makeObjectShared(createStringObject("=", 1));
shared.redacted = makeObjectShared(createStringObject("(redacted)", 10));

for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
Expand All @@ -2119,10 +2119,10 @@ void createSharedObjects(void) {
shared.integers[j]->encoding = OBJ_ENCODING_INT;
}
for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
shared.mbulkhdr[j] = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*%d\r\n", j));
shared.bulkhdr[j] = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "$%d\r\n", j));
shared.maphdr[j] = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "%%%d\r\n", j));
shared.sethdr[j] = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "~%d\r\n", j));
shared.mbulkhdr[j] = makeObjectShared(createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*%d\r\n", j)));
shared.bulkhdr[j] = makeObjectShared(createObject(OBJ_STRING, sdscatprintf(sdsempty(), "$%d\r\n", j)));
shared.maphdr[j] = makeObjectShared(createObject(OBJ_STRING, sdscatprintf(sdsempty(), "%%%d\r\n", j)));
shared.sethdr[j] = makeObjectShared(createObject(OBJ_STRING, sdscatprintf(sdsempty(), "~%d\r\n", j)));
}
/* The following two shared objects, minstring and maxstring, are not
* actually used for their value but as a special object meaning
Expand Down
Loading