-
Notifications
You must be signed in to change notification settings - Fork 702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
move clientCron onto a separate timer #1387
base: unstable
Are you sure you want to change the base?
Changes from all commits
51db017
cb816fb
ea6499b
eaea2b9
82b1d18
0264c99
172d46c
755e7ec
dfe8f17
1296a20
d25dc5e
0fd1b30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -913,7 +913,7 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) { | |
size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; | ||
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; | ||
|
||
int clientsCronTrackExpansiveClients(client *c, int time_idx) { | ||
int clientsCronTrackExpensiveClients(client *c, int time_idx) { | ||
size_t qb_size = c->querybuf ? sdsAllocSize(c->querybuf) : 0; | ||
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; | ||
size_t in_usage = qb_size + c->argv_len_sum + argv_size; | ||
|
@@ -1035,8 +1035,8 @@ int updateClientMemUsageAndBucket(client *c) { | |
} | ||
|
||
/* Return the max samples in the memory usage of clients tracked by | ||
* the function clientsCronTrackExpansiveClients(). */ | ||
void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { | ||
* the function clientsCronTrackExpensiveClients(). */ | ||
void getExpensiveClientsInfo(size_t *in_usage, size_t *out_usage) { | ||
size_t i = 0, o = 0; | ||
for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) { | ||
if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j]; | ||
|
@@ -1046,38 +1046,23 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { | |
*out_usage = o; | ||
} | ||
|
||
/* This function is called by serverCron() and is used in order to perform | ||
/* This function is called by clientsTimeProc() and is used in order to perform | ||
* operations on clients that are important to perform constantly. For instance | ||
* we use this function in order to disconnect clients after a timeout, including | ||
* clients blocked in some blocking command with a non-zero timeout. | ||
* | ||
* The function makes some effort to process all the clients every second, even | ||
* if this cannot be strictly guaranteed, since serverCron() may be called with | ||
* an actual frequency lower than server.hz in case of latency events like slow | ||
* if this cannot be strictly guaranteed, since clientsTimeProc() may be called with | ||
* an actual frequency lower than the intended rate in case of latency events like slow | ||
* commands. | ||
* | ||
* It is very important for this function, and the functions it calls, to be | ||
* very fast: sometimes the server has tens of hundreds of connected clients, and the | ||
* default server.hz value is 10, so sometimes here we need to process thousands | ||
* of clients per second, turning this function into a source of latency. | ||
* very fast. Sometimes the server has tens of thousands of connected clients, and all | ||
* of them need to be processed every second. | ||
*/ | ||
#define CLIENTS_CRON_MIN_ITERATIONS 5 | ||
void clientsCron(void) { | ||
/* Try to process at least numclients/server.hz of clients | ||
* per call. Since normally (if there are no big latency events) this | ||
* function is called server.hz times per second, in the average case we | ||
* process all the clients in 1 second. */ | ||
int numclients = listLength(server.clients); | ||
int iterations = numclients / server.hz; | ||
static void clientsCron(int clients_this_cycle) { | ||
mstime_t now = mstime(); | ||
|
||
/* Process at least a few clients while we are at it, even if we need | ||
* to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract | ||
* of processing each client once per second. */ | ||
if (iterations < CLIENTS_CRON_MIN_ITERATIONS) | ||
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? numclients : CLIENTS_CRON_MIN_ITERATIONS; | ||
|
||
|
||
int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS; | ||
/* Always zero the next sample, so that when we switch to that second, we'll | ||
* only register samples that are greater in that second without considering | ||
|
@@ -1088,14 +1073,13 @@ void clientsCron(void) { | |
* some slow command is called taking multiple seconds to execute. In that | ||
* case our array may end containing data which is potentially older | ||
* than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem | ||
* since here we want just to track if "recently" there were very expansive | ||
* since here we want just to track if "recently" there were very expensive | ||
* clients from the POV of memory usage. */ | ||
int zeroidx = (curr_peak_mem_usage_slot + 1) % CLIENTS_PEAK_MEM_USAGE_SLOTS; | ||
ClientsPeakMemInput[zeroidx] = 0; | ||
ClientsPeakMemOutput[zeroidx] = 0; | ||
|
||
|
||
while (listLength(server.clients) && iterations--) { | ||
while (listLength(server.clients) && clients_this_cycle--) { | ||
client *c; | ||
listNode *head; | ||
|
||
|
@@ -1105,14 +1089,14 @@ void clientsCron(void) { | |
c = listNodeValue(head); | ||
listRotateHeadToTail(server.clients); | ||
if (c->io_read_state != CLIENT_IDLE || c->io_write_state != CLIENT_IDLE) continue; | ||
|
||
/* The following functions do different service checks on the client. | ||
* The protocol is that they return non-zero if the client was | ||
* terminated. */ | ||
if (clientsCronHandleTimeout(c, now)) continue; | ||
if (clientsCronResizeQueryBuffer(c)) continue; | ||
if (clientsCronResizeOutputBuffer(c, now)) continue; | ||
|
||
if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue; | ||
if (clientsCronTrackExpensiveClients(c, curr_peak_mem_usage_slot)) continue; | ||
|
||
/* Iterating all the clients in getMemoryOverheadData() is too slow and | ||
* in turn would make the INFO command too slow. So we perform this | ||
|
@@ -1126,6 +1110,42 @@ void clientsCron(void) { | |
} | ||
} | ||
|
||
/* A periodic timer that performs client maintenance. | ||
* This cron task follows the following rules: | ||
* - To manage latency, we don't check more than MAX_CLIENTS_PER_CLOCK_TICK at a time | ||
* - The minimum rate will be defined by server.hz | ||
* - The maxmum rate will be defined by CONFIG_MAX_HZ | ||
* - At least CLIENTS_CRON_MIN_ITERATIONS will be performed each cycle | ||
* - All clients need to be checked (at least) once per second (if possible given other constraints) | ||
*/ | ||
#define CLIENTS_CRON_MIN_ITERATIONS 5 | ||
long long clientsTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { | ||
UNUSED(eventLoop); | ||
UNUSED(id); | ||
UNUSED(clientData); | ||
|
||
int numclients = listLength(server.clients); | ||
int clients_this_cycle = numclients / server.hz; /* Initial computation based on standard hz */ | ||
int delay_ms; | ||
|
||
if (clients_this_cycle < CLIENTS_CRON_MIN_ITERATIONS) { | ||
clients_this_cycle = min(numclients, CLIENTS_CRON_MIN_ITERATIONS); | ||
} | ||
|
||
if (clients_this_cycle > MAX_CLIENTS_PER_CLOCK_TICK) { | ||
clients_this_cycle = MAX_CLIENTS_PER_CLOCK_TICK; | ||
float required_hz = (float)numclients / MAX_CLIENTS_PER_CLOCK_TICK; | ||
JimB123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (required_hz > CONFIG_MAX_HZ) required_hz = CONFIG_MAX_HZ; | ||
delay_ms = 1000.0 / required_hz; | ||
} else { | ||
delay_ms = 1000 / server.hz; | ||
} | ||
|
||
clientsCron(clients_this_cycle); | ||
|
||
return delay_ms; | ||
} | ||
|
||
/* This function handles 'background' operations we are required to do | ||
* incrementally in the databases, such as active key expiring, resizing, | ||
* rehashing. */ | ||
|
@@ -1346,19 +1366,6 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa | |
* handler if we don't return here fast enough. */ | ||
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); | ||
|
||
server.hz = server.config_hz; | ||
/* Adapt the server.hz value to the number of configured clients. If we have | ||
* many clients, we want to call serverCron() with an higher frequency. */ | ||
if (server.dynamic_hz) { | ||
while (listLength(server.clients) / server.hz > MAX_CLIENTS_PER_CLOCK_TICK) { | ||
server.hz *= 2; | ||
if (server.hz > CONFIG_MAX_HZ) { | ||
server.hz = CONFIG_MAX_HZ; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
/* for debug purposes: skip actual cron work if pause_cron is on */ | ||
if (server.pause_cron) return 1000 / server.hz; | ||
|
||
|
@@ -1444,9 +1451,6 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa | |
} | ||
} | ||
|
||
/* We need to do a few operations on clients asynchronously. */ | ||
clientsCron(); | ||
|
||
/* Handle background operations on databases. */ | ||
databasesCron(); | ||
|
||
|
@@ -2629,7 +2633,6 @@ void initServer(void) { | |
/* Initialization after setting defaults from the config system. */ | ||
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF; | ||
server.fsynced_reploff = server.aof_enabled ? 0 : -1; | ||
server.hz = server.config_hz; | ||
server.in_fork_child = CHILD_TYPE_NONE; | ||
server.rdb_pipe_read = -1; | ||
server.rdb_child_exit_pipe = -1; | ||
|
@@ -2775,10 +2778,15 @@ void initServer(void) { | |
server.acl_info.invalid_channel_accesses = 0; | ||
|
||
/* Create the timer callback, this is our way to process many background | ||
* operations incrementally, like clients timeout, eviction of unaccessed | ||
* expired keys and so forth. */ | ||
* operations incrementally, like eviction of unaccessed expired keys, etc. */ | ||
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { | ||
serverPanic("Can't create event loop timers."); | ||
serverPanic("Can't create serverCron timer."); | ||
exit(1); | ||
} | ||
/* A separate timer for client maintenance. Runs at a variable speed depending | ||
* on the client count. */ | ||
if (aeCreateTimeEvent(server.el, 1, clientsTimeProc, NULL, NULL) == AE_ERR) { | ||
serverPanic("Can't create event clientsTimeProc timer."); | ||
exit(1); | ||
} | ||
|
||
|
@@ -5549,7 +5557,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { | |
"uptime_in_seconds:%I\r\n", (int64_t)uptime, | ||
"uptime_in_days:%I\r\n", (int64_t)(uptime / (3600 * 24)), | ||
"hz:%i\r\n", server.hz, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think it's better to show There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what you're asking/suggesting here. I don't think we can just change the definition of the existing metric. I think maybe you're suggesting a new metric? Currently the required hz for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, a new metric, since the |
||
"configured_hz:%i\r\n", server.config_hz, | ||
"configured_hz:%i\r\n", server.hz, | ||
"lru_clock:%u\r\n", server.lruclock, | ||
"executable:%s\r\n", server.executable ? server.executable : "", | ||
"config_file:%s\r\n", server.configfile ? server.configfile : "", | ||
|
@@ -5570,7 +5578,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { | |
if (all_sections || (dictFind(section_dict, "clients") != NULL)) { | ||
size_t maxin, maxout; | ||
unsigned long blocking_keys, blocking_keys_on_nokey, watched_keys; | ||
getExpansiveClientsInfo(&maxin, &maxout); | ||
getExpensiveClientsInfo(&maxin, &maxout); | ||
totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys); | ||
if (sections++) info = sdscat(info, "\r\n"); | ||
info = sdscatprintf( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to align the name with
MAX_CLIENTS_PER_CLOCK_TICK
and put it toserver.h
together, we have the chance nowThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server.h
is a big dumping ground - and included almost everywhere. I don't see a good reason to move an existing define which is already localized and move it into a global namespace.Note: I wanted to leave as much code untouched as possible (including this define). If that isn't a goal, I'd prefer making this a local
const
inside the function rather than moving the existing define to a more global namespace.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we wanna minimize the scope, we can put
MAX_CLIENTS_PER_CLOCK_TICK
here and changeCLIENTS_CRON_MIN_ITERATIONS
toMIN_CLIENTS_PER_CLOCK_TICK