diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 997f3c646e..8ef6c14348 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -132,13 +132,9 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon } } -static int canAddNetworkBytesOut(int slot) { - return clusterSlotStatsEnabled() && slot != -1; -} - /* Accumulates egress bytes for the slot. */ void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) { - if (!canAddNetworkBytesOut(slot)) return; + if (!clusterSlotStatsEnabled(slot)) return; serverAssert(slot >= 0 && slot < CLUSTER_SLOTS); server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out; @@ -152,7 +148,7 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { /* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { client *c = server.current_client; - if (c == NULL || !canAddNetworkBytesOut(c->slot)) return; + if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return; serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); serverAssert(nodeIsPrimary(server.cluster->myself)); @@ -179,7 +175,7 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation. * This function covers the internal propagation component. */ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { - if (!canAddNetworkBytesOut(slot)) return; + if (!clusterSlotStatsEnabled(slot)) return; serverAssert(slot >= 0 && slot < CLUSTER_SLOTS); server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd; @@ -214,8 +210,7 @@ void clusterSlotStatResetAll(void) { * would equate to repeating the same calculation twice. */ static int canAddCpuDuration(client *c) { - return clusterSlotStatsEnabled() && - c->slot != -1 && /* Command should be slot specific. */ + return clusterSlotStatsEnabled(c->slot) && (!server.execution_nesting || /* Either; */ (server.execution_nesting && /* 1) Command should not be nested, or */ c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */ @@ -242,8 +237,7 @@ static int canAddNetworkBytesIn(client *c) { * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */ - return clusterSlotStatsEnabled() && c->slot != -1 && !(c->flag.blocked) && - !server.in_exec; + return clusterSlotStatsEnabled(c->slot) && !(c->flag.blocked) && !server.in_exec; } /* Adds network ingress bytes of the current command in execution, @@ -338,7 +332,6 @@ void clusterSlotStatsCommand(client *c) { } } -int clusterSlotStatsEnabled(void) { - return server.cluster_slot_stats_enabled && /* Config should be enabled. */ - server.cluster_enabled; /* Cluster mode should be enabled. */ +int clusterSlotStatsEnabled(int slot) { + return server.cluster_slot_stats_enabled && server.cluster_enabled && slot != -1; } diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h index 3a78fa309f..f5c103e9ed 100644 --- a/src/cluster_slot_stats.h +++ b/src/cluster_slot_stats.h @@ -6,7 +6,7 @@ /* General use-cases. */ void clusterSlotStatReset(int slot); void clusterSlotStatResetAll(void); -int clusterSlotStatsEnabled(void); +int clusterSlotStatsEnabled(int slot); /* cpu-usec metric. */ void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration); diff --git a/src/config.c b/src/config.c index 4ba4e6f627..79fbe70d66 100644 --- a/src/config.c +++ b/src/config.c @@ -3188,7 +3188,6 @@ standardConfig static_configs[] = { createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), - createBoolConfig("reply-offload", NULL, MODIFIABLE_CONFIG, server.reply_offload_enabled, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), @@ -3251,6 +3250,8 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-io-threads-reply-offload-on", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_for_reply_offload, 7, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-io-threads-value-prefetch-off", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_value_prefetch_off, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ diff --git a/src/io_threads.c b/src/io_threads.c index 3b14e7a177..b63907caa3 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -618,3 +618,13 @@ int trySendAcceptToIOThreads(connection *conn) { return C_OK; } + +int isReplyOffloadIndicatedByIOThreads(void) { + /* Starting min_io_threads_for_reply_offload I/O threads reply offload should be beneficial for any string size */ + return server.min_io_threads_for_reply_offload && server.io_threads_num >= server.min_io_threads_for_reply_offload; +} + +int isValuePrefetchIndicatedByIOThreads(void) { + /* Starting min_io_threads_value_prefetch_off I/O threads reply offload should be more efficient without value prefetch */ + return server.io_threads_num < server.min_io_threads_value_prefetch_off; +} diff --git a/src/io_threads.h b/src/io_threads.h index a3ff582a77..dc430abd0b 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -14,5 +14,7 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); void trySendPollJobToIOThreads(void); int trySendAcceptToIOThreads(connection *conn); +int isReplyOffloadIndicatedByIOThreads(void); +int isValuePrefetchIndicatedByIOThreads(void); #endif /* IO_THREADS_H */ diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 7726749ad0..5910226c77 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -9,6 +9,7 @@ #include "memory_prefetch.h" #include "server.h" +#include "io_threads.h" typedef enum { PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */ @@ -119,9 +120,8 @@ static void prefetchEntry(KeyPrefetchInfo *info) { if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) { /* Not done yet */ moveToNextKey(); - /* If reply offload enabled no need to prefetch value because main thread will not access it */ - } else if (server.reply_offload_enabled) { - markKeyAsdone(info); + } else if (!isValuePrefetchIndicatedByIOThreads()) { + markKeyAsdone(info); } else { info->state = PREFETCH_VALUE; } diff --git a/src/networking.c b/src/networking.c index bdfc9a572c..030e668bfb 100644 --- a/src/networking.c +++ b/src/networking.c @@ -42,7 +42,6 @@ #include #include #include -#include #include /* This struct is used to encapsulate filtering criteria for operations on clients @@ -68,17 +67,16 @@ typedef struct { } clientFilter; typedef enum { - CLIENT_REPLY_PAYLOAD_DATA = 0, - CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD, -} clientReplyPayloadType; + CLIENT_REPLY_PLAIN = 0, + CLIENT_REPLY_BULK_OFFLOAD, +} clientReplyType; /* Reply payload header */ typedef struct __attribute__((__packed__)) payloadHeader { - size_t len; /* payload length in a reply buffer */ - size_t actual_len; /* actual reply length after offload expanding */ - uint8_t type; /* one of clientReplyPayloadType */ - int16_t slot; /* to report network-bytes-out for offloads */ - + size_t len; /* payload length in a reply buffer */ + size_t actual_len; /* actual reply length for bulk offloads */ + uint8_t type; /* one of clientReplyType */ + int16_t slot; /* to report network-bytes-out for offloads */ } payloadHeader; static void setProtocolError(const char *errstr, client *c); @@ -164,21 +162,19 @@ static inline int isReplicaReadyForReplData(client *replica) { !(replica->flag.close_asap); } -/* - * Reply offload can be allowed only for regular Valkey clients - * that use _writeToClient handler to write replies to client connection - */ -static bool isReplyOffloadAllowable(client *c) { +/* Reply offload can be allowed only for regular Valkey clients + * that use _writeToClient handler to write replies to client connection */ +static int isReplyOffloadAllowable(client *c) { if (c->flag.fake) { - return false; + return 0; } switch (getClientType(c)) { - case CLIENT_TYPE_NORMAL: - case CLIENT_TYPE_PUBSUB: - return true; - default: - return false; + case CLIENT_TYPE_NORMAL: + case CLIENT_TYPE_PUBSUB: + return 1; + default: + return 0; } } @@ -316,15 +312,15 @@ void putClientInPendingWriteQueue(client *c) { } } -/* - * Activate/deactivate reply offload for the client - * according to server config - */ -static void updateReplyOffloadFlag(client *c) { - if (server.reply_offload_enabled && !c->flag.reply_offload && isReplyOffloadAllowable(c)) { +/* Try to activate reply offload for a client + * There should be enough I/O threads for reply offload + * to be beneficial for performance + * The client should be allowable for reply offload */ +static void tryToActivateReplyOffload(client *c) { + if (c->flag.reply_offload) return; + + if (isReplyOffloadIndicatedByIOThreads() && isReplyOffloadAllowable(c)) { c->flag.reply_offload = 1; - } else if (!server.reply_offload_enabled && c->flag.reply_offload) { - c->flag.reply_offload = 0; } } @@ -376,7 +372,7 @@ int prepareClientToWrite(client *c) { * it should already be setup to do so (it has already pending data). */ if (!clientHasPendingReplies(c)) { /* We can change reply offload mode for the client only when its reply buffers are empty. */ - updateReplyOffloadFlag(c); + tryToActivateReplyOffload(c); putClientInPendingWriteQueue(c); } @@ -430,14 +426,13 @@ void deleteCachedResponseClient(client *recording_client) { /* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ -static inline void insertPayloadHeader(char *buf, size_t *bufpos, uint8_t type, size_t len, int slot, payloadHeader **last_header) { - /* Save the latest header */ - *last_header = (payloadHeader *)(buf + *bufpos); +static inline void insertPayloadHeader(char *buf, size_t *bufpos, uint8_t type, size_t len, int slot, payloadHeader **new_header) { + *new_header = (payloadHeader *)(buf + *bufpos); - (*last_header)->type = type; - (*last_header)->len = len; - (*last_header)->slot = slot; - (*last_header)->actual_len = 0; + (*new_header)->type = type; + (*new_header)->len = len; + (*new_header)->slot = slot; + (*new_header)->actual_len = 0; *bufpos += sizeof(payloadHeader); } @@ -452,25 +447,25 @@ static inline int updatePayloadHeader(payloadHeader *last_header, uint8_t type, static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) { /* Enforce min len for offloads as whole pointers must be written to the buffer */ - size_t min_len = (type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD ? len : 1); + size_t min_len = (type == CLIENT_REPLY_BULK_OFFLOAD ? len : 1); if (min_len > available) return 0; size_t reply_len = min(available, len); // If cluster slots stats disabled set slot to -1 to prevent excessive per slot headers - if (!clusterSlotStatsEnabled()) slot = -1; + if (!clusterSlotStatsEnabled(slot)) slot = -1; /* Try to add payload to last chunk if possible */ if (*last_header != NULL) { if (updatePayloadHeader(*last_header, type, reply_len, slot) == C_OK) return reply_len; } - /* Recheck min len condition and recalcuate allowed len with a new header to be added */ + /* Recheck min len condition and recalculate allowed len with a new header to be added */ if (sizeof(payloadHeader) + min_len > available) return 0; available -= sizeof(payloadHeader); if (len > available) reply_len = available; /* Start a new payload chunk */ - insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header); + insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header); return reply_len; } @@ -500,12 +495,8 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le return reply_len; } -size_t _addReplyToBuffer(client *c, const char *s, size_t len) { - return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PAYLOAD_DATA); -} - -size_t _addBulkOffloadToBuffer(client *c, robj *obj) { - return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); +static size_t _addReplyToBuffer(client *c, const char *s, size_t len) { + return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PLAIN); } /* Adds the reply to the reply linked list. @@ -558,11 +549,7 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl } void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) { - _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PAYLOAD_DATA); -} - -void _addBulkOffloadToList(client *c, robj *obj) { - _addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PLAIN); } /* The subscribe / unsubscribe command family has a push as a reply, @@ -613,11 +600,15 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { void _addBulkOffloadToBufferOrList(client *c, robj *obj) { if (c->flag.close_after_reply) return; - /* Refcount will be decremented in post write handler (i.e. in _postWriteToClient) */ + /* Refcount will be decremented in write completion handler by the main thread */ incrRefCount(obj); - if (!_addBulkOffloadToBuffer(c, obj)) { - _addBulkOffloadToList(c, obj); + /* Put pointer to string in addition to pointer to object in a buffer + * I/O thread will use (access) only pointer to string. + * It will eliminate expensive memory access to object by I/O thread */ + void* obj_ptrs[2] = {obj, obj->ptr}; + if (!_addReplyPayloadToBuffer(c, obj_ptrs, sizeof(obj_ptrs), CLIENT_REPLY_BULK_OFFLOAD)) { + _addReplyPayloadToList(c, c->reply, (char *)obj_ptrs, sizeof(obj_ptrs), CLIENT_REPLY_BULK_OFFLOAD); } } @@ -974,7 +965,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { * - It has enough room already allocated * - And not too large (avoid large memmove) * - And the client is not in a pending I/O state */ - if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size - prev->used > 0 && + if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size > prev->used && c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) { size_t len_to_copy = prev->size - prev->used; if (len_to_copy > length) len_to_copy = length; @@ -1004,7 +995,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { buf->used = 0; buf->last_header = 0; if (c->flag.reply_offload) { - upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PAYLOAD_DATA, length, c->slot, buf->size); + upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PLAIN, length, c->slot, buf->size); } memcpy(buf->buf + buf->used, s, length); buf->used += length; @@ -1272,10 +1263,11 @@ void addReplyBulkLen(client *c, robj *obj) { } int tryOffloadBulkReply(client *c, robj *obj) { - if (!c->flag.reply_offload) return C_ERR; + if (!isReplyOffloadIndicatedByIOThreads()) return C_ERR; if (obj->encoding != OBJ_ENCODING_RAW) return C_ERR; if (obj->refcount == OBJ_STATIC_REFCOUNT) return C_ERR; if (prepareClientToWrite(c) != C_OK) return C_ERR; + if (!c->flag.reply_offload) return C_ERR; _addBulkOffloadToBufferOrList(c, obj); @@ -2137,10 +2129,10 @@ typedef struct replyIOV { int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit * reached during iov prepearation */ int offload_active; - int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields - * for expanding reply offloads */ - char (*prefixes)[LONG_STR_SIZE + 3]; - char *crlf; + int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields + * for expanding reply offloads */ + char (*prefixes)[LONG_STR_SIZE + 3]; /* bulk string prefixes */ + char *crlf; /* bulk string suffix */ } replyIOV; /* @@ -2178,10 +2170,11 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, return; } - /* Aggregate data len from the beginning of the buffer even though - * part of the data should be skipped in this round due to last_written_len */ + /* Aggregate data length from the beginning of the buffer even though + * part of the data can be skipped in this writevToClient invocation due to last_written_len */ metadata->data_len += buf_len; + /* Skip data written in the previous writevToClient invocation(s) */ if (reply->last_written_len >= buf_len) { reply->last_written_len -= buf_len; return; @@ -2195,12 +2188,16 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, } static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { + void **obj_ptrs = (void **)buf; while (buf_len > 0 && !reply->limit_reached) { - robj **obj = (robj **)buf; - char *str = (*obj)->ptr; - size_t str_len = stringObjectLen(*obj); - - char* prefix = reply->prefixes[reply->prfxcnt]; + /* Skip pointer to object */ + obj_ptrs++; + /* Use pointer to string */ + void* str = *obj_ptrs; + size_t str_len = sdslen(str); + + /* RESP encodes bulk strings as $\r\n\r\n */ + char *prefix = reply->prefixes[reply->prfxcnt]; prefix[0] = '$'; size_t num_len = ll2string(prefix + 1, sizeof(reply->prefixes[0]) - 3, str_len); prefix[num_len + 1] = '\r'; @@ -2208,25 +2205,24 @@ static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *repl int cnt = reply->cnt; addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata); - /* Increment prfxcnt only if prefix was added to reply in this round */ + /* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */ if (reply->cnt > cnt) reply->prfxcnt++; addPlainBufferToReplyIOV(str, str_len, reply, metadata); addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata); - buf += sizeof(void*); - buf_len -= sizeof(void*); + obj_ptrs++; + buf_len -= sizeof(void *) * 2; } } static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { char *ptr = buf; while (ptr < buf + bufpos && !reply->limit_reached) { - payloadHeader *header = (payloadHeader*)ptr; + payloadHeader *header = (payloadHeader *)ptr; ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_DATA) { + if (header->type == CLIENT_REPLY_PLAIN) { addPlainBufferToReplyIOV(ptr, header->len, reply, metadata); } else { - serverAssert(header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); uint64_t data_len = metadata->data_len; addOffloadedBulkToReplyIOV(ptr, header->len, reply, metadata); /* Store actual reply len for cluster slot stats */ @@ -2234,7 +2230,6 @@ static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *repl } ptr += header->len; } - serverAssert(ptr <= buf + bufpos); } static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { @@ -2252,6 +2247,19 @@ static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWr metadata->bufpos = bufpos; } +/* + * This function calculates and stores on the client next: + * io_last_written_buf - Last buffer that has been written to the client connection + * io_last_written_bufpos - The buffer has been written until this position + * io_last_written_data_len - The actual length of the data written from this buffer + * This length differs from written bufpos in case of reply offload + * + * The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient + * to detect last client reply buffer that can be released + * + * The io_last_written_data_len is used by writevToClient for resuming write from the point + * where previous writevToClient invocation stopped + **/ static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) { int last = bufcnt - 1; if (totwritten == totlen) { @@ -2286,45 +2294,26 @@ void proceedToUnwritten(replyIOV *reply, int nwritten) { } } +/* Bulk string prefix max size */ +#define BULK_STRING_LEN_PREFIX_MAX_SIZE (LONG_STR_SIZE + 3) +/* Bulk string offload requires 3 iov entries - + * length prefix ($\r\n), string () and suffix (\r\n) */ +#define NUM_OF_IOV_PER_BULK_OFFLOAD 3 + /* This function should be called from _writeToClient when the reply list is not empty, * it gathers the scattered buffers from reply list and sends them away with connWritev. * If we write successfully, it returns C_OK, otherwise, C_ERR is returned. * Sets the c->nwritten to the number of bytes the server wrote to the client. - * Can be called from the main thread or an I/O thread - * - * INTERNALS - * The writevToClient strives to write all client reply buffers to the client connection. - * However, it may encounter NET_MAX_WRITES_PER_EVENT or IOV_MAX or socket limit. In such case, - * some client reply buffers will be written completely and some partially. - * In next invocation writevToClient should resume from the exact position where it stopped. - * Also writevToClient should communicate to _postWriteToClient which buffers written completely - * and can be released. It is intricate in case of reply offloading as length of reply buffer does not match - * to network bytes out. - * - * For this purpose, writevToClient uses 3 data members on the client struct as input/output paramaters: - * io_last_written_buf - Last buffer that has been written to the client connection - * io_last_written_bufpos - The buffer has been written until this position - * io_last_written_data_len - The actual length of the data written from this buffer - * This length differs from written bufpos in case of reply offload - * - * The writevToClient uses addBufferToReplyIOV, addCompoundBufferToReplyIOV, addOffloadedBulkToReplyIOV, addPlainBufferToReplyIOV - * to build reply iovec array. These functions know to skip io_last_written_data_len, specifically addPlainBufferToReplyIOV - * - * In the end of execution writevToClient calls saveLastWrittenBuf for calculating "last written" buf/pos/data_len - * and storing on the client. While building reply iov, writevToClient gathers auxiliary bufWriteMetadata that - * helps in this calculation. In some cases, It may take several (> 2) invocations for writevToClient to write reply - * from a single buffer but saveLastWrittenBuf knows to calculate "last written" buf/pos/data_len properly - * - * The _postWriteToClient uses io_last_written_buf and io_last_written_bufpos in order to detect completely written buffers - * and release them - * - * */ + * Can be called from the main thread or an I/O thread */ static int writevToClient(client *c) { int iovmax = min(IOV_MAX, c->conn->iovcnt); struct iovec iov_arr[iovmax]; - char prefixes[iovmax / 3 + 1][LONG_STR_SIZE + 3]; + /* iov_arr can accomodate iovmax / NUM_OF_IOV_PER_BULK_OFFLOAD full bulk offloads + * and one partial bulk offload */ + char prefixes[iovmax / NUM_OF_IOV_PER_BULK_OFFLOAD + 1][BULK_STRING_LEN_PREFIX_MAX_SIZE]; char crlf[2] = {'\r', '\n'}; int bufcnt = 0; + /* +1 is for c->buf */ bufWriteMetadata metadata[listLength(c->reply) + 1]; replyIOV reply; @@ -2346,34 +2335,33 @@ static int writevToClient(client *c) { addBufferToReplyIOV(c->buf, bufpos, &reply, &metadata[bufcnt++]); } - listIter iter; - listNode *next; - listRewind(c->reply, &iter); - while ((next = listNext(&iter)) && !reply.limit_reached) { - clientReplyBlock *o = listNodeValue(next); + if (lastblock) { + listIter iter; + listNode *next; + listRewind(c->reply, &iter); + while ((next = listNext(&iter)) && !reply.limit_reached) { + clientReplyBlock *o = listNodeValue(next); - size_t used = o->used; - /* Use c->io_last_bufpos as the currently used portion of the block. - * We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the - * current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data - * that may not yet be visible to the current thread*/ - if (!inMainThread() && next == lastblock) used = c->io_last_bufpos; + size_t used = o->used; + /* Use c->io_last_bufpos as the currently used portion of the block. + * We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the + * current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data + * that may not yet be visible to the current thread*/ + if (!inMainThread() && next == lastblock) used = c->io_last_bufpos; - if (used == 0) { /* empty node, skip over it. */ - if (next == lastblock) break; - continue; - } + if (used == 0) { /* empty node, skip over it. */ + if (next == lastblock) break; + continue; + } - addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]); - if (!metadata[bufcnt].data_len) break; - bufcnt++; + addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]); + if (!metadata[bufcnt].data_len) break; + bufcnt++; - if (next == lastblock) break; + if (next == lastblock) break; + } } - serverAssert(reply.last_written_len == 0); - serverAssert(reply.cnt != 0); - ssize_t totwritten = 0; while (1) { int nwritten = connWritev(c->conn, reply.iov, reply.cnt); @@ -2461,24 +2449,26 @@ void resetLastWrittenBuf(client *c) { } static void releaseBufOffloads(char *buf, size_t bufpos) { - char *ptr = buf; - while (ptr < buf + bufpos) { - payloadHeader *header = (payloadHeader *)ptr; - ptr += sizeof(payloadHeader); + char *buf_ptr = buf; + while (buf_ptr < buf + bufpos) { + payloadHeader *header = (payloadHeader *)buf_ptr; + buf_ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD) { + if (header->type == CLIENT_REPLY_BULK_OFFLOAD) { clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len); - robj** obj_ptr = (robj**)ptr; + void **obj_ptrs = (void **)buf_ptr; size_t len = header->len; while (len > 0) { - decrRefCount(*obj_ptr); - obj_ptr++; - len -= sizeof(obj_ptr); + decrRefCount(*obj_ptrs); + obj_ptrs++; + /* Skip pointer to string */ + obj_ptrs++; + len -= sizeof(void *) * 2; } } - ptr += header->len; + buf_ptr += header->len; } } @@ -2496,10 +2486,6 @@ void releaseReplyOffloads(client *c) { } } -/* - * See INTERNALS note on writevToClient for explanation about - * io_last_written_buf and io_last_written_bufpos - */ static void _postWriteToClient(client *c) { if (c->nwritten <= 0) return; server.stat_net_output_bytes += c->nwritten; diff --git a/src/server.h b/src/server.h index 96fe5f58f8..10bf3c0e27 100644 --- a/src/server.h +++ b/src/server.h @@ -769,7 +769,7 @@ typedef struct payloadHeader payloadHeader; /* Defined in networking.c */ * which is actually a linked list of blocks like that, that is: client->reply. */ typedef struct clientReplyBlock { size_t size, used; - payloadHeader* last_header; + payloadHeader *last_header; char buf[]; } clientReplyBlock; @@ -1199,7 +1199,7 @@ typedef struct client { unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ listNode clients_pending_write_node; /* list node in clients_pending_write or in clients_pending_io_write list */ size_t bufpos; - payloadHeader* last_header; /* Pointer to the last header in a buffer in reply offload mode */ + payloadHeader *last_header; /* Pointer to the last header in a buffer in reply offload mode */ int original_argc; /* Num of arguments of original command if arguments were rewritten. */ robj **original_argv; /* Arguments of original command if arguments were rewritten. */ /* Client flags and state indicators */ @@ -1647,9 +1647,10 @@ struct valkeyServer { int io_threads_num; /* Number of IO threads to use. */ int active_io_threads_num; /* Current number of active IO threads, includes main thread. */ int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */ + int min_io_threads_for_reply_offload; /* Minimum number of IO threads for enabling reply offload */ + int min_io_threads_value_prefetch_off; /* Minimum number of IO threads for disabling value prefetch */ int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ - int reply_offload_enabled; /* Reply offload enabled or not */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */ diff --git a/src/unit/test_networking.c b/src/unit/test_networking.c index 6eeb20302a..31bf0bd78c 100644 --- a/src/unit/test_networking.c +++ b/src/unit/test_networking.c @@ -130,7 +130,7 @@ int test_rewriteClientCommandArgument(int argc, char **argv, int flags) { return 0; } -static client* createTestClient(void) { +static client *createTestClient(void) { client *c = zcalloc(sizeof(client)); c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); @@ -149,23 +149,26 @@ static void freeReplyOffloadClient(client *c) { zfree(c); } +/* Each bulk offload puts 2 pointers to a reply buffer */ +#define PTRS_LEN (sizeof(void *) * 2) + int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) { UNUSED(argc); UNUSED(argv); UNUSED(flags); - client * c = createTestClient(); + client *c = createTestClient(); /* Test 1: Add bulk offloads to the buffer */ robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test")); _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 2); - TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + sizeof(void*)); + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + PTRS_LEN); payloadHeader *header1 = c->last_header; - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == sizeof(void*)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == PTRS_LEN); robj **ptr = (robj **)(c->buf + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); @@ -173,44 +176,44 @@ int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) { robj *obj2 = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2")); _addBulkOffloadToBufferOrList(c, obj2); - TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * sizeof(void*)); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + /* 2 offloads expected in c->buf */ + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * PTRS_LEN); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); - ptr = (robj **)(c->buf + sizeof(payloadHeader) + sizeof(void*)); + ptr = (robj **)(c->buf + sizeof(payloadHeader) + PTRS_LEN); TEST_ASSERT(obj2 == *ptr); /* Test 2: Add plain reply to the buffer */ - const char* plain = "+OK\r\n"; + const char *plain = "+OK\r\n"; size_t plain_len = strlen(plain); _addReplyToBufferOrList(c, plain, plain_len); - TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + plain_len); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + /* 2 offloads and plain reply expected in c->buf. So 2 headers expected as well */ + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + plain_len); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); payloadHeader *header2 = c->last_header; - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN); TEST_ASSERT(header2->len == plain_len); + /* Add more plain replies. Check same plain reply header updated properly */ for (int i = 0; i < 9; ++i) _addReplyToBufferOrList(c, plain, plain_len); - TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + 10 * plain_len); - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + 10 * plain_len); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN); TEST_ASSERT(header2->len == plain_len * 10); /* Test 3: Add one more bulk offload to the buffer */ _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 3); - TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * sizeof(void*) + 10 * plain_len); + TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * PTRS_LEN + 10 * plain_len); payloadHeader *header3 = c->last_header; - TEST_ASSERT(header3->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - ptr = (robj **)((char*)c->last_header + sizeof(payloadHeader)); + TEST_ASSERT(header3->type == CLIENT_REPLY_BULK_OFFLOAD); + ptr = (robj **)((char *)c->last_header + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); + releaseReplyOffloads(c); decrRefCount(obj); - decrRefCount(obj); - decrRefCount(obj); - - decrRefCount(obj2); decrRefCount(obj2); freeReplyOffloadClient(c); @@ -227,31 +230,35 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { /* Test 1: Add bulk offloads to the reply list */ - /* Reply len to fill the buffer almost completely */ + /* Select reply length so that there is place for 2 headers and 4 bytes only + * 4 bytes is not enough for object pointer(s) + * This will force bulk offload to be added to reply list + */ size_t reply_len = c->buf_usable_size - 2 * sizeof(payloadHeader) - 4; - char *reply = zmalloc(reply_len); memset(reply, 'a', reply_len); _addReplyToBufferOrList(c, reply, reply_len); TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len); TEST_ASSERT(listLength(c->reply) == 0); + /* As bulk offload header+pointer can't be accommodated in c->buf + * then one block is expected in c->reply */ robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test")); _addBulkOffloadToBufferOrList(c, obj); - TEST_ASSERT(obj->refcount == 2); TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len); TEST_ASSERT(listLength(c->reply) == 1); + /* Check bulk offload header+pointer inside c->reply */ listIter iter; listRewind(c->reply, &iter); listNode *next = listNext(&iter); clientReplyBlock *blk = listNodeValue(next); - TEST_ASSERT(blk->used == sizeof(payloadHeader) + sizeof(void*)); + TEST_ASSERT(blk->used == sizeof(payloadHeader) + PTRS_LEN); payloadHeader *header1 = blk->last_header; - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == sizeof(void*)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == PTRS_LEN); robj **ptr = (robj **)(blk->buf + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); @@ -260,9 +267,9 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 3); TEST_ASSERT(listLength(c->reply) == 1); - TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * sizeof(void*)); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * PTRS_LEN); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); /* Test 3: Add plain replies to cause reply list grow */ while (reply_len < blk->size - blk->used) _addReplyToBufferOrList(c, reply, reply_len); @@ -277,13 +284,14 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { clientReplyBlock *blk2 = listNodeValue(next); /* last header in 2nd block */ payloadHeader *header3 = blk2->last_header; - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA && header3->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN && header3->type == CLIENT_REPLY_PLAIN); TEST_ASSERT((header2->len + header3->len) % reply_len == 0); - decrRefCount(obj); - decrRefCount(obj); + releaseReplyOffloads(c); decrRefCount(obj); + zfree(reply); + freeReplyOffloadClient(c); return 0; @@ -294,7 +302,7 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { UNUSED(argv); UNUSED(flags); - const char* expected_reply = "$5\r\nhello\r\n"; + const char *expected_reply = "$5\r\nhello\r\n"; ssize_t total_len = strlen(expected_reply); const int iovmax = 16; char crlf[2] = {'\r', '\n'}; @@ -314,7 +322,7 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { TEST_ASSERT(reply.iov_len_total == total_len); TEST_ASSERT(reply.cnt == 3); - const char* ptr = expected_reply; + const char *ptr = expected_reply; for (int i = 0; i < reply.cnt; ++i) { TEST_ASSERT(memcmp(ptr, reply.iov[i].iov_base, reply.iov[i].iov_len) == 0); ptr += reply.iov[i].iov_len; @@ -335,12 +343,12 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { initReplyIOV(c, iovmax, iov_arr2, prefixes2, crlf, &reply2); addBufferToReplyIOV(c->buf, c->bufpos, &reply2, &metadata2[0]); TEST_ASSERT(reply2.iov_len_total == total_len - 1); - TEST_ASSERT((*(char*)reply2.iov[0].iov_base) == '5'); + TEST_ASSERT((*(char *)reply2.iov[0].iov_base) == '5'); /* Test 4: Last written buf/pos/data_len after 2nd invocation */ saveLastWrittenBuf(c, metadata2, 1, reply2.iov_len_total, 4); /* 4 more bytes has been written */ TEST_ASSERT(c->io_last_written_buf == c->buf); - TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ + TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ TEST_ASSERT(c->io_last_written_data_len == 5); /* 1 + 4 */ /* Test 5: 3rd writevToclient invocation */ @@ -352,7 +360,7 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { initReplyIOV(c, iovmax, iov_arr3, prefixes3, crlf, &reply3); addBufferToReplyIOV(c->buf, c->bufpos, &reply3, &metadata3[0]); TEST_ASSERT(reply3.iov_len_total == total_len - 5); - TEST_ASSERT((*(char*)reply3.iov[0].iov_base) == 'e'); + TEST_ASSERT((*(char *)reply3.iov[0].iov_base) == 'e'); /* Test 6: Last written buf/pos/data_len after 3rd invocation */ saveLastWrittenBuf(c, metadata3, 1, reply3.iov_len_total, reply3.iov_len_total); /* everything has been written */ @@ -366,4 +374,4 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { freeReplyOffloadClient(c); return 0; -} \ No newline at end of file +} diff --git a/tests/instances.tcl b/tests/instances.tcl index 5cc96b0edb..301b3b78b8 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -111,6 +111,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} { if {$::io_threads} { puts $cfg "io-threads 2" puts $cfg "events-per-io-thread 0" + puts $cfg "min-io-threads-reply-offload-on 2" } if {$::log_req_res} { diff --git a/tests/support/server.tcl b/tests/support/server.tcl index ed3805d549..e943356468 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -251,11 +251,6 @@ proc tags_acceptable {tags err_return} { return 0 } - if {$::reply_offload && [lsearch $tags "reply-offload:skip"] >= 0} { - set err "Not supported in reply-offload mode" - return 0 - } - if {$::tcl_version < 8.6 && [lsearch $tags "ipv6"] >= 0} { set err "TCL version is too low and does not support this" return 0 @@ -521,10 +516,7 @@ proc start_server {options {code undefined}} { if {$::io_threads} { dict set config "io-threads" 2 dict set config "events-per-io-thread" 0 - } - - if {$::reply_offload} { - dict set config "reply-offload" "yes" + dict set config "min-io-threads-reply-offload-on" 2 } foreach line $data { diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 1c00bc8846..54bb923674 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -55,7 +55,6 @@ set ::valgrind 0 set ::durable 0 set ::tls 0 set ::io_threads 0 -set ::reply_offload 0 set ::tls_module 0 set ::stack_logging 0 set ::verbose 0 @@ -599,7 +598,6 @@ proc print_help_screen {} { "--wait-server Wait after server is started (so that you can attach a debugger)." "--dump-logs Dump server log on test failure." "--io-threads Run tests with IO threads." - "--reply-offload Run tests with reply offload enabled." "--tls Run tests in TLS mode." "--tls-module Run tests in TLS mode with Valkey module." "--host Run tests against an external host." @@ -661,8 +659,6 @@ for {set j 0} {$j < [llength $argv]} {incr j} { set ::quiet 1 } elseif {$opt eq {--io-threads}} { set ::io_threads 1 - } elseif {$opt eq {--reply-offload}} { - set ::reply_offload 1 } elseif {$opt eq {--tls} || $opt eq {--tls-module}} { package require tls 1.6 set ::tls 1 diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl index 4140b6d05e..f9d890e1c7 100644 --- a/tests/unit/client-eviction.tcl +++ b/tests/unit/client-eviction.tcl @@ -1,4 +1,4 @@ -tags {"external:skip logreqres:skip reply-offload:skip"} { +tags {"external:skip logreqres:skip"} { # Get info about a server client connection: # name - name of client we want to query @@ -52,6 +52,8 @@ proc kb {v} { start_server {} { set maxmemory_clients 3000000 r config set maxmemory-clients $maxmemory_clients + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 test "client evicted due to large argv" { r flushdb @@ -332,6 +334,8 @@ start_server {} { set obuf_limit [mb 3] r config set maxmemory-clients $maxmemory_clients r config set client-output-buffer-limit "normal $obuf_limit 0 0" + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 test "avoid client eviction when client is freed by output buffer limit" { r flushdb @@ -385,13 +389,15 @@ start_server {} { } start_server {} { + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 + test "decrease maxmemory-clients causes client eviction" { set maxmemory_clients [mb 4] set client_count 10 set qbsize [expr ($maxmemory_clients - [mb 1]) / $client_count] r config set maxmemory-clients $maxmemory_clients - # Make multiple clients consume together roughly 1mb less than maxmemory_clients set rrs {} for {set j 0} {$j < $client_count} {incr j} { @@ -426,6 +432,9 @@ start_server {} { } start_server {} { + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 + test "evict clients only until below limit" { set client_count 10 set client_mem [mb 1] @@ -434,6 +443,7 @@ start_server {} { r client setname control r client no-evict on + # Make multiple clients consume together roughly 1mb less than maxmemory_clients set total_client_mem 0 set max_client_mem 0 @@ -488,6 +498,9 @@ start_server {} { } start_server {} { + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 + test "evict clients in right order (large to small)" { # Note that each size step needs to be at least x2 larger than previous step # because of how the client-eviction size bucketing works @@ -555,6 +568,9 @@ start_server {} { } start_server {} { + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 + foreach type {"client no-evict" "maxmemory-clients disabled"} { r flushall r client no-evict on diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index 94a56bb4a5..b911e9dd22 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -384,6 +384,10 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} { } test {stats: client input and output buffer limit disconnections} { + # Disable reply offload + set reply_offload [lindex [r config get min-io-threads-reply-offload-on] 1] + r config set min-io-threads-reply-offload-on 0 + r config resetstat set info [r info stats] assert_equal [getInfoProperty $info client_query_buffer_limit_disconnections] {0} @@ -407,9 +411,11 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} { r set key [string repeat a 100000] ;# to trigger output buffer limit check this needs to be big catch {r get key} r config set client-output-buffer-limit $org_outbuf_limit + # Restore min-io-threads-reply-offload-on value + r config set min-io-threads-reply-offload-on $reply_offload set info [r info stats] assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1} - } {} {logreqres:skip reply-offload:skip} ;# same as obuf-limits.tcl, skip logreqres + } {} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres test {clients: pubsub clients} { set info [r info clients] diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 504b787bda..bf18d2e571 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -1,7 +1,9 @@ -start_server {tags {"maxmemory external:skip reply-offload:skip"}} { +start_server {tags {"maxmemory external:skip"}} { r config set maxmemory 11mb r config set maxmemory-policy allkeys-lru set server_pid [s process_id] + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 proc init_test {client_eviction} { r flushdb sync diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 1f391dfd73..f4b3f328ba 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -1,4 +1,7 @@ -start_server {tags {"obuf-limits external:skip logreqres:skip reply-offload:skip"}} { +start_server {tags {"obuf-limits external:skip logreqres:skip"}} { + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 + test {CONFIG SET client-output-buffer-limit} { set oldval [lindex [r config get client-output-buffer-limit] 1] diff --git a/tests/unit/replybufsize.tcl b/tests/unit/replybufsize.tcl index 4929fa832f..0a7be405b3 100644 --- a/tests/unit/replybufsize.tcl +++ b/tests/unit/replybufsize.tcl @@ -8,8 +8,10 @@ proc get_reply_buffer_size {cname} { return $rbufsize } -start_server {tags {"replybufsize reply-offload:skip"}} { - +start_server {tags {"replybufsize"}} { + # Disable reply offload + r config set min-io-threads-reply-offload-on 0 + test {verify reply buffer limits} { # In order to reduce test time we can set the peak reset time very low r debug replybuffer peak-reset-time 100 diff --git a/valkey.conf b/valkey.conf index 3cee81c2c3..e23aea39de 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1439,12 +1439,6 @@ lazyfree-lazy-user-flush yes # # prefetch-batch-max-size 16 # -# For use cases where command replies include Bulk strings (e.g. GET, MGET) -# reply offload can be enabled to eliminate espensive memory access -# and redundant data copy performed by main thread -# -# reply-offload yes -# # NOTE: # 1. The 'io-threads-do-reads' config is deprecated and has no effect. Please # avoid using this config if possible.