Skip to content

Commit

Permalink
Add new replicate module API to bypass command validation
Browse files Browse the repository at this point in the history
Signed-off-by: Seungmin Lee <[email protected]>
  • Loading branch information
Seungmin Lee committed Jan 15, 2025
1 parent c5a1585 commit 7127d24
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 27 deletions.
66 changes: 42 additions & 24 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ static void zsetKeyReset(ValkeyModuleKey *key);
static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key);
void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d);
void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data);
int moduleReplicate(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, va_list ap);

/* Helpers for VM_SetCommandInfo. */
static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info);
Expand Down Expand Up @@ -3622,34 +3623,22 @@ int VM_ReplyWithLongDouble(ValkeyModuleCtx *ctx, long double ld) {
* The command returns VALKEYMODULE_ERR if the format specifiers are invalid
* or the command name does not belong to a known command. */
int VM_Replicate(ValkeyModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
struct serverCommand *cmd;
robj **argv = NULL;
int argc = 0, flags = 0, j;
va_list ap;

cmd = lookupCommandByCString((char *)cmdname);
if (!cmd) return VALKEYMODULE_ERR;

/* Create the client and dispatch the command. */
va_start(ap, fmt);
argv = moduleCreateArgvFromUserFormat(cmdname, fmt, &argc, &flags, ap);
int result = moduleReplicate(ctx, VALKEYMODULE_FLAG_DEFAULT, cmdname, fmt, ap);
va_end(ap);
if (argv == NULL) return VALKEYMODULE_ERR;

/* Select the propagation target. Usually is AOF + replicas, however
* the caller can exclude one or the other using the "A" or "R"
* modifiers. */
int target = 0;
if (!(flags & VALKEYMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
if (!(flags & VALKEYMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;

alsoPropagate(ctx->client->db->id, argv, argc, target);
return result;
}

/* Release the argv. */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
server.dirty++;
return VALKEYMODULE_OK;
/* Same as ValkeyModule_Replicate, but can take ValkeyModuleFlag
* Can be either VALKEYMODULE_FLAG_DEFAULT, which means default behavior
* (same as calling ValkeyModule_Replicate) */
int VM_ReplicateWithFlag(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
int result = moduleReplicate(ctx, flag, cmdname, fmt, ap);
va_end(ap);
return result;
}

/* This function will replicate the command exactly as it was invoked
Expand Down Expand Up @@ -13680,6 +13669,34 @@ void moduleDefragGlobals(void) {
dictReleaseIterator(di);
}

/* Helper function for VM_Replicate and VM_ReplicateWithFlag to replicate the specified command
* and arguments to replicas and AOF, as effect of execution of the calling command implementation.
* Skip command validation if the ValkeyModuleFlag is set to VALKEYMODULE_FLAG_SKIP_VALIDATION. */
int moduleReplicate(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, va_list ap) {
struct serverCommand *cmd;
robj **argv = NULL;
int argc = 0, flags = 0, j;
if (flag != VALKEYMODULE_FLAG_SKIP_VALIDATION) {
cmd = lookupCommandByCString((char *)cmdname);
if (!cmd) return VALKEYMODULE_ERR;
}
/* Create the client and dispatch the command. */
argv = moduleCreateArgvFromUserFormat(cmdname, fmt, &argc, &flags, ap);
if (argv == NULL) return VALKEYMODULE_ERR;
/* Select the propagation target. Usually is AOF + replicas, however
* the caller can exclude one or the other using the "A" or "R"
* modifiers. */
int target = 0;
if (!(flags & VALKEYMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
if (!(flags & VALKEYMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;
alsoPropagate(ctx->client->db->id, argv, argc, target);
/* Release the argv. */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
server.dirty++;
return VALKEYMODULE_OK;
}

/* Returns the name of the key currently being processed.
* There is no guarantee that the key name is always available, so this may return NULL.
*/
Expand Down Expand Up @@ -13793,6 +13810,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringPtrLen);
REGISTER_API(AutoMemory);
REGISTER_API(Replicate);
REGISTER_API(ReplicateWithFlag);
REGISTER_API(ReplicateVerbatim);
REGISTER_API(DeleteKey);
REGISTER_API(UnlinkKey);
Expand Down
8 changes: 8 additions & 0 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,11 @@ typedef enum {
VALKEYMODULE_ACL_LOG_CHANNEL /* Channel authorization failure */
} ValkeyModuleACLLogEntryReason;

typedef enum {
VALKEYMODULE_FLAG_DEFAULT = 0, /* Default behavior */
VALKEYMODULE_FLAG_SKIP_VALIDATION, /* Skip validation */
} ValkeyModuleFlag;

/* Incomplete structures needed by both the core and modules. */
typedef struct ValkeyModuleCtx ValkeyModuleCtx;
typedef struct ValkeyModuleIO ValkeyModuleIO;
Expand Down Expand Up @@ -1180,6 +1185,8 @@ VALKEYMODULE_API int (*ValkeyModule_StringToStreamID)(const ValkeyModuleString *
VALKEYMODULE_API void (*ValkeyModule_AutoMemory)(ValkeyModuleCtx *ctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_Replicate)(ValkeyModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_ReplicateWithFlag)(ValkeyModuleCtx *ctx, ValkeyModuleFlag flag, const char *cmdname, const char *fmt, ...)
VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_ReplicateVerbatim)(ValkeyModuleCtx *ctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API const char *(*ValkeyModule_CallReplyStringPtr)(ValkeyModuleCallReply *reply,
size_t *len)VALKEYMODULE_ATTR;
Expand Down Expand Up @@ -1847,6 +1854,7 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
VALKEYMODULE_GET_API(StringPtrLen);
VALKEYMODULE_GET_API(AutoMemory);
VALKEYMODULE_GET_API(Replicate);
VALKEYMODULE_GET_API(ReplicateWithFlag);
VALKEYMODULE_GET_API(ReplicateVerbatim);
VALKEYMODULE_GET_API(DeleteKey);
VALKEYMODULE_GET_API(UnlinkKey);
Expand Down
25 changes: 22 additions & 3 deletions tests/modules/propagate.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ int propagateTestSimpleCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,

/* Replicate two commands to test MULTI/EXEC wrapping. */
ValkeyModule_Replicate(ctx,"INCR","c","counter-1");
ValkeyModule_Replicate(ctx,"INCR","c","counter-2");
ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INCR",
"c", "counter-2");
ValkeyModule_ReplyWithSimpleString(ctx,"OK");
return VALKEYMODULE_OK;
}
Expand All @@ -266,15 +267,28 @@ int propagateTestMixedCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, i
ValkeyModule_FreeCallReply(reply);

ValkeyModule_Replicate(ctx,"INCR","c","counter-1");
ValkeyModule_Replicate(ctx,"INCR","c","counter-2");

ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INCR",
"c", "counter-2");
reply = ValkeyModule_Call(ctx, "INCR", "c!", "after-call");
ValkeyModule_FreeCallReply(reply);

ValkeyModule_ReplyWithSimpleString(ctx,"OK");
return VALKEYMODULE_OK;
}

int propagateTestInvalidCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
/* Replicate two commands to test MULTI/EXEC wrapping. */
ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INVALID",
"c", "counter-1");
ValkeyModule_ReplicateWithFlag(ctx, VALKEYMODULE_FLAG_SKIP_VALIDATION, "INVALID",
"c", "counter-2");
ValkeyModule_ReplyWithSimpleString(ctx, "OK");
return VALKEYMODULE_OK;
}

int propagateTestNestedCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc)
{
VALKEYMODULE_NOT_USED(argv);
Expand Down Expand Up @@ -380,6 +394,11 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
"write",1,1,1) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "propagate-test.invalid",
propagateTestInvalidCommand, "", 1, 1,
1) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx,"propagate-test.nested",
propagateTestNestedCommand,
"write",1,1,1) == VALKEYMODULE_ERR)
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/moduleapi/propagate.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,32 @@ tags "modules" {
}
}

tags "modules" {
start_server [list overrides [list loadmodule "$testmodule"]] {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
start_server [list overrides [list loadmodule "$testmodule"]] {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
# Start the replication process...
$replica replicaof $master_host $master_port
wait_for_sync $replica
after 1000
test {module crash when propagating invalid command} {
$master propagate-test.invalid
catch {wait_for_sync $replica}

wait_for_log_messages -1 {"*=== * BUG REPORT START: Cut & paste starting from here ===*"} 0 10 1000
wait_for_log_messages -1 {"* This replica panicked sending an error to its primary after processing the command '<unknown>' *"} 0 10 1000

assert_equal 1 [count_log_message -1 "=== .* BUG REPORT START: Cut & paste starting from here ==="]
assert_equal 1 [count_log_message -1 "This replica panicked sending an error to its primary after processing the command '<unknown>'"]
}
}
}
}

tags "modules aof" {
foreach aofload_type {debug_cmd startup} {
Expand Down

0 comments on commit 7127d24

Please sign in to comment.