Skip to content

Commit

Permalink
daemon/defer: cleanup heads of queues after idle
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Oct 14, 2024
1 parent ad604a7 commit fefc174
Showing 1 changed file with 56 additions and 27 deletions.
83 changes: 56 additions & 27 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, boo
}
}

/// Pop and return query from the specified queue, deactivate idle if not needed.
static inline struct protolayer_iter_ctx *pop_query_queue(int priority)
{
kr_assert(queue_len(queues[priority]) > 0);
struct protolayer_iter_ctx *ctx = queue_head(queues[priority]);
queue_pop(queues[priority]);
if (--waiting_requests <= 0) {
kr_assert(waiting_requests == 0);
uv_idle_stop(&idle_handle);
VERBOSE_LOG(" deactivating idle\n");
}
return ctx;
}


/// Pop and return the query with the highest priority, UDP or non-UDP based on current phase,
/// deactivate idle if not needed.
static inline struct protolayer_iter_ctx *pop_query(void)
Expand Down Expand Up @@ -238,17 +253,28 @@ static inline struct protolayer_iter_ctx *pop_query(void)
i = PRIORITY_UDP;
}

struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
queue_pop(queues[i]);
if (--waiting_requests <= 0) {
kr_assert(waiting_requests == 0);
uv_idle_stop(&idle_handle);
VERBOSE_LOG(" deactivating idle\n");
}
return ctx;
return pop_query_queue(i);
}


// Break the given query; for streams break also all follow-up queries and force-close the stream.
static inline void break_query(struct protolayer_iter_ctx *ctx, int err)
{
if (ctx->session->stream) {
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
if (!ctx->session->closing) {
session2_force_close(ctx->session); // session is not freed here as iter contexts exist
}
queue_pop(sdata->queue);
while (queue_len(sdata->queue) > 0) {
protolayer_break(ctx, kr_error(err)); // session is not freed here as other contexts exist
ctx = queue_head(sdata->queue);
queue_pop(sdata->queue);
}
}
protolayer_break(ctx, kr_error(err));
}

/// Process a single deferred query (or defer again) if there is any.
/// Time accounting should have been just started, the stamp is used, accounted address is set.
static inline void process_single_deferred(void) {
Expand All @@ -269,30 +295,13 @@ static inline void process_single_deferred(void) {

if (ctx->session->closing) {
VERBOSE_LOG(" BREAK (session is closing)\n");
if (ctx->session->stream) {
queue_pop(sdata->queue);
while (queue_len(sdata->queue) > 0) {
protolayer_break(ctx, kr_error(ECANCELED)); // session is not freed here as other contexts exist
ctx = queue_head(sdata->queue);
queue_pop(sdata->queue);
}
}
protolayer_break(ctx, kr_error(ECANCELED));
break_query(ctx, ECANCELED);
return;
}

if (age_ns >= REQ_TIMEOUT) {
VERBOSE_LOG(" BREAK (timeout)\n");
if (ctx->session->stream) {
while (queue_len(sdata->queue) > 0) {
ctx = queue_head(sdata->queue);
queue_pop(sdata->queue);
protolayer_break(ctx, kr_error(ETIME)); // session is not freed here as it is not closing yet
}
session2_force_close(ctx->session);
} else {
protolayer_break(ctx, kr_error(ETIME));
}
break_query(ctx, ETIME);
return;
}

Expand All @@ -316,6 +325,25 @@ static inline void process_single_deferred(void) {
protolayer_continue(ctx);
}

/// Break expired requests at the beginning of queues, uses current stamp.
static inline void cleanup_queues(void) {
for (int i = 0; i < QUEUES_CNT; i++) {
int cnt = 0;
while (queue_len(queues[i]) > 0) {
struct protolayer_iter_ctx *ctx = queue_head(queues[i]);
struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
uint64_t age_ns = defer_sample_state.stamp - idata->req_stamp;
if (age_ns < REQ_TIMEOUT) break;
pop_query_queue(i);
break_query(ctx, ETIME);
cnt++;
}
if (cnt > 0) {
VERBOSE_LOG(" BREAK %d queries from %d\n", cnt, i);
}
}
}

/// Unwrap: defer or process the query synchronously.
/// Time accounting should have been started, the stamp is used, accounted address is set.
static enum protolayer_iter_cb_result pl_defer_unwrap(
Expand Down Expand Up @@ -372,6 +400,7 @@ static void defer_queues_idle(uv_idle_t *handle) {
process_single_deferred();
defer_sample_restart();
}
cleanup_queues();
defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
udp_queue_send_all();

Expand Down

0 comments on commit fefc174

Please sign in to comment.