Skip to content

Commit

Permalink
daemon/defer: add alternate UDP and non-UDP phases
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Oct 7, 2024
1 parent 9d4ad75 commit fa4dfe7
Showing 1 changed file with 84 additions and 20 deletions.
104 changes: 84 additions & 20 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)

#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX
#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS)) // -1 for synchronous, +1 for unverified
#define UNVERIFIED_PRIORITY 1 // -1 synchronous, 1 async UDP, {0, 2, 3} other async
#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified
#define PRIORITY_SYNC (-1) // no queue
#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue

#define KRU_CAPACITY (1<<10)
#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s of accounted time
#define BASE_PRICE(nsec) ((uint64_t)MAX_DECAY * nsec / 1000000ll)
// TODO reconsider time flow speed in KRU (currently sum of all-processes accounted time)

#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase
#define REQ_TIMEOUT 5000000 // ns (THREAD_CPUTIME), older deferred queries are dropped
#define IDLE_TIMEOUT 1000000 // ns (THREAD_CPUTIME); if exceeded, continue processing after next poll phase
#define PHASE_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); switch between udp, non-udp phases
#define PHASE_NON_UDP_TIMEOUT 400000 // ns (THREAD_CPUTIME); after timeout or emptying queue
#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase

#define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)

Expand All @@ -53,6 +56,30 @@ protolayer_iter_ctx_queue_t queues[QUEUES_CNT];
int waiting_requests = 0;
int queue_ix = QUEUES_CNT; // MIN( last popped queue, first non-empty queue )

enum phase {
PHASE_UDP = 1,
PHASE_NON_UDP = 2,
PHASE_ANY = PHASE_UDP | PHASE_NON_UDP
} phase = PHASE_ANY;
uint64_t phase_elapsed = 0; // ns
bool phase_accounting = false; // add accounted time to phase_elapsed on next call of defer_account

static inline void phase_set(enum phase p) {
if (phase != p) {
phase_elapsed = 0;
phase = p;
}
}
static inline void phase_account(uint64_t nsec) {
kr_assert(phase != PHASE_ANY);
phase_elapsed += nsec;
if ((phase == PHASE_UDP) && (phase_elapsed > PHASE_UDP_TIMEOUT)) {
phase_set(PHASE_NON_UDP);
} else if ((phase == PHASE_NON_UDP) && (phase_elapsed > PHASE_NON_UDP_TIMEOUT)) {
phase_set(PHASE_UDP);
}
}

struct pl_defer_iter_data {
struct protolayer_data h;
uint64_t req_stamp; // time when request was received, uses get_stamp()
Expand All @@ -69,6 +96,11 @@ static bool using_avx2(void)

/// Increment KRU counters by given time.
void defer_account(uint64_t nsec, union kr_sockaddr *addr) {
if (phase_accounting) {
phase_account(nsec);
phase_accounting = false;
}

_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
Expand Down Expand Up @@ -113,11 +145,17 @@ void defer_account(uint64_t nsec, union kr_sockaddr *addr) {

/// Determine priority of the request in [-1, QUEUES_CNT - 1].
/// Lower value has higher priority, -1 should be synchronous.
/// Both UDP and non-UDP may end up with synchronous priority
/// if the phase is active and no requests can be scheduled before them.
static inline int classify(const union kr_sockaddr *addr, bool stream)
{
if (!stream) {
if (!stream) { // UDP
VERBOSE_LOG(" unverified address\n");
return UNVERIFIED_PRIORITY; // UDP
if ((phase & PHASE_UDP) && (queue_len(queues[PRIORITY_UDP]) == 0)) {
phase_set(PHASE_UDP);
return PRIORITY_SYNC;
}
return PRIORITY_UDP;
}

uint32_t time_now = atomic_load_explicit(&defer->time_now, memory_order_relaxed);
Expand All @@ -134,15 +172,15 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
}

int threshold_index = 0; // 0: synchronous
for (; LOADS_THRESHOLDS[threshold_index] < max_load; threshold_index++);
int priority = 0;
for (; LOADS_THRESHOLDS[priority] < max_load; priority++);

VERBOSE_LOG(" load %d on /%d\n", max_load, prefix);

int priority = threshold_index - 1;
if (priority >= UNVERIFIED_PRIORITY)
priority++;

if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) {
phase_set(PHASE_NON_UDP);
return PRIORITY_SYNC;
}
return priority;
}

Expand All @@ -160,14 +198,36 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority)
}
}

/// Pop and return the query with the highest priority, deactivate idle if not needed.
/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
/// deactivate idle if not needed.
static inline struct protolayer_iter_ctx *pop_query(void)
{
for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
if (queue_ix >= QUEUES_CNT) return NULL;
const int waiting_udp = queue_len(queues[PRIORITY_UDP]);
const int waiting_non_udp = waiting_requests - waiting_udp;

enum phase new_phase;
if ((phase & PHASE_NON_UDP) && (waiting_non_udp > 0)) {
new_phase = PHASE_NON_UDP; // maybe changing from PHASE_ANY
} else if ((phase & PHASE_UDP) && (waiting_udp > 0)) {
new_phase = PHASE_UDP; // maybe changing from PHASE_ANY
} else if (waiting_non_udp > 0) {
new_phase = PHASE_NON_UDP; // change from PHASE_UDP, no UDP queries
} else {
new_phase = PHASE_UDP; // change from PHASE_NON_UDP, no non-UDP queries
}
phase_set(new_phase);

int i;
if (phase == PHASE_NON_UDP) {
for (; queue_ix < QUEUES_CNT && queue_len(queues[queue_ix]) == 0; queue_ix++);
if (queue_ix >= PRIORITY_UDP) kr_assert(false);
i = queue_ix;
} else {
i = PRIORITY_UDP;
}

struct protolayer_iter_ctx *ctx = queue_head(queues[queue_ix]);
queue_pop(queues[queue_ix]);
struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
queue_pop(queues[i]);
if (--waiting_requests <= 0) {
kr_assert(waiting_requests == 0);
uv_idle_stop(&idle_handle);
Expand All @@ -184,6 +244,7 @@ static inline void process_single_deferred(void) {
if (ctx == NULL) return;

defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
phase_accounting = true;

struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
Expand Down Expand Up @@ -234,6 +295,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(

if (priority == -1) {
VERBOSE_LOG(" CONTINUE\n");
phase_accounting = true;
return protolayer_continue(ctx);
}

Expand All @@ -260,10 +322,12 @@ static void defer_queues_idle(uv_idle_t *handle) {
defer_sample_restart();
}
defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
udp_queue_send_all(); // TODO keep here or call after processing each priority level?
// (or after UNVERIFIED_PRIORITY but beware future QUIC)
udp_queue_send_all();

if (waiting_requests > 0) {
VERBOSE_LOG(" %d waiting\n", waiting_requests);
} else {
phase_set(PHASE_ANY);
}
VERBOSE_LOG("POLL\n");
}
Expand Down

0 comments on commit fa4dfe7

Please sign in to comment.