Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable module API SendClusterMessage to use light message header type #1572

Open
wants to merge 6 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 93 additions & 26 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,13 @@ dictType clusterSdsToListType = {
typedef struct {
enum {
ITER_DICT,
ITER_LIST
ITER_LIST,
ITER_NODE,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helps simplify the code a bit however it's just iterating over one node.

} type;
union {
dictIterator di;
listIter li;
clusterNode *node;
};
} ClusterNodeIterator;

Expand All @@ -215,6 +217,11 @@ static void clusterNodeIterInitMyShard(ClusterNodeIterator *iter) {
listRewind(nodes, &iter->li);
}

static void clusterNodeIterNode(ClusterNodeIterator *iter, clusterNode *node) {
iter->type = ITER_NODE;
iter->node = node;
}

static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
switch (iter->type) {
case ITER_DICT: {
Expand All @@ -229,13 +236,24 @@ static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
/* Return the value associated with the node, or NULL if no more nodes */
return ln ? listNodeValue(ln) : NULL;
}

case ITER_NODE: {
if (iter->node) {
clusterNode *node = iter->node;
iter->node = NULL;
return node;
}
return NULL;
}
}
serverPanic("Unknown iterator type %d", iter->type);
}

static void clusterNodeIterReset(ClusterNodeIterator *iter) {
if (iter->type == ITER_DICT) {
dictResetIterator(&iter->di);
} else if (iter->type == ITER_NODE) {
iter->node = NULL;
}
}

Expand Down Expand Up @@ -988,7 +1006,7 @@ void clusterUpdateMyselfFlags(void) {
int nofailover = server.cluster_replica_no_failover ? CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
myself->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
myself->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED | CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}
Expand Down Expand Up @@ -3012,18 +3030,27 @@ static void clusterProcessPublishPacket(clusterMsgDataPublish *publish_data, uin
}
}

static void clusterProcessLightPacket(clusterLink *link, uint16_t type) {
static void clusterProcessLightPacket(clusterNode *sender, clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;

serverLog(LL_DEBUG, "Processing light packet of type: %s", clusterGetMessageTypeString(type));
if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name, module_id, type, payload, len);
} else {
serverAssert(0);
}
}

static inline int messageTypeSupportsLightHdr(uint16_t type) {
switch (type) {
case CLUSTERMSG_TYPE_PUBLISH: return 1;
case CLUSTERMSG_TYPE_PUBLISHSHARD: return 1;
case CLUSTERMSG_TYPE_MODULE: return 1;
}
return 0;
}
Expand Down Expand Up @@ -3116,8 +3143,14 @@ int clusterIsValidPacket(clusterLink *link) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len);
if (is_light) {
clusterMsgLight *hdr_light = (clusterMsgLight *)link->rcvbuf;
explen = sizeof(clusterMsgLight) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr_light->data.module.msg.len);
} else {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len);
}
} else {
/* We don't know this type of packet, so we assume it's well formed. */
explen = totlen;
Expand Down Expand Up @@ -3171,7 +3204,7 @@ int clusterProcessPacket(clusterLink *link) {
}
clusterNode *sender = link->node;
sender->data_received = now;
clusterProcessLightPacket(link, type);
clusterProcessLightPacket(sender, link, type);
return 1;
}

Expand All @@ -3188,10 +3221,16 @@ int clusterProcessPacket(clusterLink *link) {

/* Checks if the node supports light message hdr */
if (sender) {
if (flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
if (flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
}

if (flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
}
}

Expand Down Expand Up @@ -4316,22 +4355,50 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) {
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
msglen += sizeof(clusterMsgModule) - 3 + len;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MODULE, msglen);

clusterMsg *hdr = getMessageFromSendBlock(msgblock);
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
memcpy(hdr->data.module.msg.bulk_data, payload, len);

if (link)
clusterSendMessage(link, msgblock);
else
clusterBroadcastMessage(msgblock);
clusterMsgSendBlock *msgblock = NULL, *msgblock_light = NULL;
ClusterNodeIterator iter;

clusterMsgSendBlockDecrRefCount(msgblock);
if (link) {
clusterNodeIterNode(&iter, link->node);
} else {
/* Broadcast to all the nodes. */
clusterNodeIterInitAllNodes(&iter);
}
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdrForModule(node)) {
if (msgblock_light == NULL) {
uint32_t msglen_light = sizeof(clusterMsgLight) - sizeof(union clusterMsgData);
msglen_light += sizeof(clusterMsgModule) - 3 + len;
int msgtype_light = CLUSTERMSG_TYPE_MODULE;
msgtype_light |= CLUSTERMSG_LIGHT;
msgblock_light = createClusterMsgSendBlock(msgtype_light, msglen_light);
clusterMsgLight *hdr = getLightMessageFromSendBlock(msgblock_light);
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
memcpy(hdr->data.module.msg.bulk_data, payload, len);
}
clusterSendMessage(node->link, msgblock_light);
} else {
if (msgblock == NULL) {
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
msglen += sizeof(clusterMsgModule) - 3 + len;
int msgtype = CLUSTERMSG_TYPE_MODULE;
msgblock = createClusterMsgSendBlock(msgtype, msglen);
clusterMsg *hdr = getMessageFromSendBlock(msgblock);
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
memcpy(hdr->data.module.msg.bulk_data, payload, len);
}
clusterSendMessage(node->link, msgblock);
}
}
clusterNodeIterReset(&iter);
if (msgblock != NULL) clusterMsgSendBlockDecrRefCount(msgblock);
if (msgblock_light != NULL) clusterMsgSendBlockDecrRefCount(msgblock_light);
}

/* This function gets a cluster node ID string as target, the same way the nodes
Expand Down Expand Up @@ -4381,7 +4448,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) {
if (nodeSupportsLightMsgHdrForPubSub(node)) {
clusterSendMessage(node->link, msgblock_light);
} else {
if (msgblock == NULL) {
Expand Down
28 changes: 15 additions & 13 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,19 @@ typedef struct clusterLink {
} clusterLink;

/* Cluster node flags and macros. */
#define CLUSTER_NODE_PRIMARY (1 << 0) /* The node is a primary */
#define CLUSTER_NODE_REPLICA (1 << 1) /* The node is a replica */
#define CLUSTER_NODE_PFAIL (1 << 2) /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL (1 << 3) /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF (1 << 4) /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE (1 << 5) /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */
#define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_SUPPORTED (1 << 11) /* This node supports light pubsub message header. */
#define CLUSTER_NODE_PRIMARY (1 << 0) /* The node is a primary */
#define CLUSTER_NODE_REPLICA (1 << 1) /* The node is a replica */
#define CLUSTER_NODE_PFAIL (1 << 2) /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL (1 << 3) /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF (1 << 4) /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE (1 << 5) /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */
#define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */
#define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */
#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand All @@ -67,7 +68,8 @@ typedef struct clusterLink {
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
#define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED)
#define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED)
#define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED)
#define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))

/* This structure represent elements of node->fail_reports. */
Expand Down
2 changes: 2 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -8980,6 +8980,8 @@ void VM_RegisterClusterMessageReceiver(ValkeyModuleCtx *ctx,
/* Send a message to all the nodes in the cluster if `target` is NULL, otherwise
* at the specified target, which is a VALKEYMODULE_NODE_ID_LEN bytes node ID, as
* returned by the receiver callback or by the nodes iteration functions.
* This API uses lesser overhead if the target server support the light message header type (~30B)
* or else it uses the regular message overhead (~2KB).
*
* The function returns VALKEYMODULE_OK if the message was successfully sent,
* otherwise if the node is not connected or such node ID does not map to any
Expand Down
39 changes: 39 additions & 0 deletions tests/modules/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,52 @@ int test_cluster_shards(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
return VALKEYMODULE_OK;
}

#define MSGTYPE_PING 1
#define MSGTYPE_PONG 2

/* test.pingall */
int PingallCommand_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

ValkeyModule_SendClusterMessage(ctx, NULL, MSGTYPE_PING, "Hey", 3);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}

/* Callback for message MSGTYPE_PING */
void PingReceiver(ValkeyModuleCtx *ctx,
const char *sender_id,
uint8_t type,
const unsigned char *payload,
uint32_t len) {
ValkeyModule_Log(ctx, "notice", "PING (type %d) RECEIVED from %.*s: '%.*s'", type, VALKEYMODULE_NODE_ID_LEN,
sender_id, (int)len, payload);
ValkeyModule_SendClusterMessage(ctx, NULL, MSGTYPE_PONG, "Ohi!", 4);
ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx, "INCR", "c", "pings_received");
ValkeyModule_FreeCallReply(reply);
}

/* Callback for message MSGTYPE_PONG */
void PongReceiver(ValkeyModuleCtx *ctx,
const char *sender_id,
uint8_t type,
const unsigned char *payload,
uint32_t len) {
ValkeyModule_Log(ctx, "notice", "PONG (type %d) RECEIVED from %.*s: '%.*s'", type, VALKEYMODULE_NODE_ID_LEN,
sender_id, (int)len, payload);
}

int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

if (ValkeyModule_Init(ctx, "cluster", 1, VALKEYMODULE_APIVER_1)== VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "test.pingall", PingallCommand_ValkeyCommand, "readonly", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "test.cluster_slots", test_cluster_slots, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

Expand Down
19 changes: 19 additions & 0 deletions tests/unit/moduleapi/cluster.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ source tests/support/cli.tcl
# cluster creation is complicated with TLS, and the current tests don't really need that coverage
tags {tls:skip external:skip cluster modules} {

set testmodule [file normalize tests/modules/cluster.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]

test "Cluster module send message API - VM_SendClusterMessage" {
assert_equal OK [$node1 test.pingall]
assert_equal 2 [CI 0 cluster_stats_messages_module_sent]
wait_for_condition 50 100 {
[CI 1 cluster_stats_messages_module_received] eq 1 &&
[CI 2 cluster_stats_messages_module_received] eq 1
} else {
fail "node 2 or node 3 didn't receive cluster module message"
}
}
}

set testmodule_nokey [file normalize tests/modules/blockonbackground.so]
set testmodule_blockedclient [file normalize tests/modules/blockedclient.so]
set testmodule [file normalize tests/modules/blockonkeys.so]
Expand Down
Loading