Skip to content

Commit

Permalink
move clientCron onto a separate timer
Browse files Browse the repository at this point in the history
  • Loading branch information
JimB123 committed Dec 3, 2024
1 parent 9f8b174 commit f18e63c
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 85 deletions.
13 changes: 6 additions & 7 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ void loadServerConfigFromString(char *config) {
{"list-max-ziplist-value", 2, 2},
{"lua-replicate-commands", 2, 2},
{"io-threads-do-reads", 2, 2},
{"dynamic-hz", 2, 2},
{NULL, 0},
};
char buf[1024];
Expand Down Expand Up @@ -626,8 +627,8 @@ void loadServerConfigFromString(char *config) {
}

/* To ensure backward compatibility and work while hz is out of range */
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ;
if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ;

sdsfreesplitres(lines, totlines);
reading_config_file = 0;
Expand Down Expand Up @@ -2472,9 +2473,8 @@ static int updateHZ(const char **err) {
UNUSED(err);
/* Hz is more a hint from the user, so we accept values out of range
* but cap them to reasonable values. */
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
server.hz = server.config_hz;
if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ;
if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ;
return 1;
}

Expand Down Expand Up @@ -3168,7 +3168,6 @@ standardConfig static_configs[] = {
createBoolConfig("activerehashing", NULL, MODIFIABLE_CONFIG, server.activerehashing, 1, NULL, NULL),
createBoolConfig("stop-writes-on-bgsave-error", NULL, MODIFIABLE_CONFIG, server.stop_writes_on_bgsave_err, 1, NULL, NULL),
createBoolConfig("set-proc-title", NULL, IMMUTABLE_CONFIG, server.set_proc_title, 1, NULL, NULL), /* Should setproctitle be used? */
createBoolConfig("dynamic-hz", NULL, MODIFIABLE_CONFIG, server.dynamic_hz, 1, NULL, NULL), /* Adapt hz to # of clients.*/
createBoolConfig("lazyfree-lazy-eviction", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_eviction, 1, NULL, NULL),
createBoolConfig("lazyfree-lazy-expire", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_expire, 1, NULL, NULL),
createBoolConfig("lazyfree-lazy-server-del", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_server_del, 1, NULL, NULL),
Expand Down Expand Up @@ -3302,7 +3301,7 @@ standardConfig static_configs[] = {
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, INT_MIN, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, INT_MIN, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_replicas_to_write, 0, INTEGER_CONFIG, NULL, updateGoodReplicas),
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_replicas_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodReplicas),
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
Expand Down
108 changes: 57 additions & 51 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};

int clientsCronTrackExpansiveClients(client *c, int time_idx) {
int clientsCronTrackExpensiveClients(client *c, int time_idx) {
size_t qb_size = c->querybuf ? sdsAllocSize(c->querybuf) : 0;
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0;
size_t in_usage = qb_size + c->argv_len_sum + argv_size;
Expand Down Expand Up @@ -1035,8 +1035,8 @@ int updateClientMemUsageAndBucket(client *c) {
}

/* Return the max samples in the memory usage of clients tracked by
* the function clientsCronTrackExpansiveClients(). */
void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
* the function clientsCronTrackExpensiveClients(). */
void getExpensiveClientsInfo(size_t *in_usage, size_t *out_usage) {
size_t i = 0, o = 0;
for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) {
if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j];
Expand All @@ -1046,38 +1046,23 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
*out_usage = o;
}

/* This function is called by serverCron() and is used in order to perform
/* This function is called by clientsTimerProc() and is used in order to perform
* operations on clients that are important to perform constantly. For instance
* we use this function in order to disconnect clients after a timeout, including
* clients blocked in some blocking command with a non-zero timeout.
*
* The function makes some effort to process all the clients every second, even
* if this cannot be strictly guaranteed, since serverCron() may be called with
* an actual frequency lower than server.hz in case of latency events like slow
* if this cannot be strictly guaranteed, since clientsTimerProc() may be called with
* an actual frequency lower than the intended rate in case of latency events like slow
* commands.
*
* It is very important for this function, and the functions it calls, to be
* very fast: sometimes the server has tens of hundreds of connected clients, and the
* default server.hz value is 10, so sometimes here we need to process thousands
* of clients per second, turning this function into a source of latency.
* very fast. Sometimes the server has tens of thousands of connected clients, and all
* of them need to be processed every second.
*/
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
/* Try to process at least numclients/server.hz of clients
* per call. Since normally (if there are no big latency events) this
* function is called server.hz times per second, in the average case we
* process all the clients in 1 second. */
int numclients = listLength(server.clients);
int iterations = numclients / server.hz;
static void clientsCron(int clients_this_cycle) {
mstime_t now = mstime();

/* Process at least a few clients while we are at it, even if we need
* to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
* of processing each client once per second. */
if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? numclients : CLIENTS_CRON_MIN_ITERATIONS;


int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
/* Always zero the next sample, so that when we switch to that second, we'll
* only register samples that are greater in that second without considering
Expand All @@ -1088,14 +1073,13 @@ void clientsCron(void) {
* some slow command is called taking multiple seconds to execute. In that
* case our array may end containing data which is potentially older
* than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem
* since here we want just to track if "recently" there were very expansive
* since here we want just to track if "recently" there were very expensive
* clients from the POV of memory usage. */
int zeroidx = (curr_peak_mem_usage_slot + 1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
ClientsPeakMemInput[zeroidx] = 0;
ClientsPeakMemOutput[zeroidx] = 0;


while (listLength(server.clients) && iterations--) {
while (listLength(server.clients) && clients_this_cycle--) {
client *c;
listNode *head;

Expand All @@ -1105,14 +1089,14 @@ void clientsCron(void) {
c = listNodeValue(head);
listRotateHeadToTail(server.clients);
if (c->io_read_state != CLIENT_IDLE || c->io_write_state != CLIENT_IDLE) continue;

/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
if (clientsCronHandleTimeout(c, now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronResizeOutputBuffer(c, now)) continue;

if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
if (clientsCronTrackExpensiveClients(c, curr_peak_mem_usage_slot)) continue;

/* Iterating all the clients in getMemoryOverheadData() is too slow and
* in turn would make the INFO command too slow. So we perform this
Expand All @@ -1126,6 +1110,40 @@ void clientsCron(void) {
}
}

/* A periodic timer that performs client maintenance.
* This cron task follows the following rules:
* - All clients need to be checked (at least) once per second
* - To manage latency, we don't check more than MAX_CLIENTS_PER_CLOCK_TICK at a time
* - The minimum rate will be defined by server.hz
* - At least CLIENTS_CRON_MIN_ITERATIONS will be performed each cycle
*/
#define CLIENTS_CRON_MIN_ITERATIONS 5
long long clientsTimerProc(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);

int numclients = listLength(server.clients);
int clients_this_cycle = numclients / server.hz; // Initial computation based on standard hz
int delayMs;

if (clients_this_cycle < CLIENTS_CRON_MIN_ITERATIONS) {
clients_this_cycle = min(numclients, CLIENTS_CRON_MIN_ITERATIONS);
}

if (clients_this_cycle > MAX_CLIENTS_PER_CLOCK_TICK) {
clients_this_cycle = MAX_CLIENTS_PER_CLOCK_TICK;
float required_hz = (float)numclients / MAX_CLIENTS_PER_CLOCK_TICK;
delayMs = 1000.0 / required_hz;
} else {
delayMs = 1000 / server.hz;
}

clientsCron(clients_this_cycle);

return delayMs;
}

/* This function handles 'background' operations we are required to do
* incrementally in the databases, such as active key expiring, resizing,
* rehashing. */
Expand Down Expand Up @@ -1346,19 +1364,6 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

server.hz = server.config_hz;
/* Adapt the server.hz value to the number of configured clients. If we have
* many clients, we want to call serverCron() with an higher frequency. */
if (server.dynamic_hz) {
while (listLength(server.clients) / server.hz > MAX_CLIENTS_PER_CLOCK_TICK) {
server.hz *= 2;
if (server.hz > CONFIG_MAX_HZ) {
server.hz = CONFIG_MAX_HZ;
break;
}
}
}

/* for debug purposes: skip actual cron work if pause_cron is on */
if (server.pause_cron) return 1000 / server.hz;

Expand Down Expand Up @@ -1444,9 +1449,6 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa
}
}

/* We need to do a few operations on clients asynchronously. */
clientsCron();

/* Handle background operations on databases. */
databasesCron();

Expand Down Expand Up @@ -2629,7 +2631,6 @@ void initServer(void) {
/* Initialization after setting defaults from the config system. */
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
server.fsynced_reploff = server.aof_enabled ? 0 : -1;
server.hz = server.config_hz;
server.in_fork_child = CHILD_TYPE_NONE;
server.rdb_pipe_read = -1;
server.rdb_child_exit_pipe = -1;
Expand Down Expand Up @@ -2775,10 +2776,15 @@ void initServer(void) {
server.acl_info.invalid_channel_accesses = 0;

/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
* operations incrementally, like eviction of unaccessed expired keys, etc. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
serverPanic("Can't create serverCron timer.");
exit(1);
}
/* A separate timer for client maintenance. Runs at a variable speed depending
* on the client count. */
if (aeCreateTimeEvent(server.el, 1, clientsTimerProc, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event clientsTimerProc timer.");
exit(1);
}

Expand Down Expand Up @@ -5549,7 +5555,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"uptime_in_seconds:%I\r\n", (int64_t)uptime,
"uptime_in_days:%I\r\n", (int64_t)(uptime / (3600 * 24)),
"hz:%i\r\n", server.hz,
"configured_hz:%i\r\n", server.config_hz,
"configured_hz:%i\r\n", server.hz,
"lru_clock:%u\r\n", server.lruclock,
"executable:%s\r\n", server.executable ? server.executable : "",
"config_file:%s\r\n", server.configfile ? server.configfile : "",
Expand All @@ -5570,7 +5576,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
if (all_sections || (dictFind(section_dict, "clients") != NULL)) {
size_t maxin, maxout;
unsigned long blocking_keys, blocking_keys_on_nokey, watched_keys;
getExpansiveClientsInfo(&maxin, &maxout);
getExpensiveClientsInfo(&maxin, &maxout);
totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys);
if (sections++) info = sdscat(info, "\r\n");
info = sdscatprintf(
Expand Down
4 changes: 0 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1681,10 +1681,6 @@ struct valkeyServer {
char *configfile; /* Absolute config file path, or NULL */
char *executable; /* Absolute executable file path. */
char **exec_argv; /* Executable argv vector (copy). */
int dynamic_hz; /* Change hz value depending on # of clients. */
int config_hz; /* Configured HZ value. May be different than
the actual 'hz' field value if dynamic-hz
is enabled. */
mode_t umask; /* The umask value of the process on startup */
int hz; /* serverCron() calls frequency in hertz */
int in_fork_child; /* indication that this is a fork child */
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,6 @@ test {diskless loading short read} {
$replica config set repl-diskless-load swapdb
$master config set hz 500
$replica config set hz 500
$master config set dynamic-hz no
$replica config set dynamic-hz no
# Try to fill the master with all types of data types / encodings
set start [clock clicks -milliseconds]

Expand Down
2 changes: 0 additions & 2 deletions tests/unit/moduleapi/hooks.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ tags "modules" {
}
# set some configs that will cause many loading progress events during aof loading
r config set key-load-delay 500
r config set dynamic-hz no
r config set hz 500
r DEBUG LOADAOF
assert_equal [r hooks.event_last loading-aof-start] 0
Expand All @@ -73,7 +72,6 @@ tags "modules" {
}
}
# undo configs before next test
r config set dynamic-hz yes
r config set key-load-delay 0

test {Test module rdb save hook} {
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/moduleapi/testrdb.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ tags "modules" {
$replica config set repl-diskless-load swapdb
$master config set hz 500
$replica config set hz 500
$master config set dynamic-hz no
$replica config set dynamic-hz no
set start [clock clicks -milliseconds]
for {set k 0} {$k < 30} {incr k} {
r testrdb.set.key key$k [string repeat A [expr {int(rand()*1000000)}]]
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/other.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ start_server {tags {"other external:skip"}} {
}

start_cluster 1 0 {tags {"other external:skip cluster slow"}} {
r config set dynamic-hz no hz 500
r config set hz 500
test "Server can trigger resizing" {
r flushall
# hashslot(foo) is 12182
Expand Down
16 changes: 0 additions & 16 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2270,22 +2270,6 @@ client-output-buffer-limit pubsub 32mb 8mb 60
# 100 only in environments where very low latency is required.
hz 10

# Normally it is useful to have an HZ value which is proportional to the
# number of clients connected. This is useful in order, for instance, to
# avoid too many clients are processed for each background task invocation
# in order to avoid latency spikes.
#
# Since the default HZ value by default is conservatively set to 10, the server
# offers, and enables by default, the ability to use an adaptive HZ value
# which will temporarily raise when there are many connected clients.
#
# When dynamic HZ is enabled, the actual configured HZ will be used
# as a baseline, but multiples of the configured HZ value will be actually
# used as needed once more clients are connected. In this way an idle
# instance will use very little CPU time while a busy instance will be
# more responsive.
dynamic-hz yes

# When a child rewrites the AOF file, if the following option is enabled
# the file will be fsync-ed every 4 MB of data generated. This is useful
# in order to commit the file to the disk more incrementally and avoid
Expand Down

0 comments on commit f18e63c

Please sign in to comment.