Skip to content

Commit

Permalink
replace task timeout with resilience4j timelimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
EddeCCC committed Dec 17, 2024
1 parent 1123a7a commit afe5745
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 405 deletions.
6 changes: 4 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ orgSpringframework = "5.3.39"
# @pin 2.7.18 is the latest release which runs on Java 8, this version marks the en of open source support for Sb 2.x
orgSpringframeworkBoot = "2.7.18"
orgTestcontainers = "1.20.3"
# @pin resilience4j 2.* uses Java 17 APIs
ioGithubResilience4j = "1.7.1"

[libraries]
# @pin 1.3.* are the latest versions, which support java 8
Expand All @@ -42,8 +44,8 @@ commonsBeanutils = "commons-beanutils:commons-beanutils:1.9.4"
commonsIo = "commons-io:commons-io:2.17.0"
ioGithubNetmikeyLogunitLogunitCore = { module = "io.github.netmikey.logunit:logunit-core", version.ref = "ioGithubNetmikeyLogunit" }
ioGithubNetmikeyLogunitLogunitLogback = { module = "io.github.netmikey.logunit:logunit-logback", version.ref = "ioGithubNetmikeyLogunit" }
# @pin resilience4j 2.* uses Java 17 APIs
ioGithubResilience4jResilience4jRetry = "io.github.resilience4j:resilience4j-retry:1.7.1"
ioGithubResilience4jResilience4jRetry = { module = "io.github.resilience4j:resilience4j-retry", version.ref = "ioGithubResilience4j" }
ioGithubResilience4jResilience4jTimelimiter = { module = "io.github.resilience4j:resilience4j-timelimiter", version.ref = "ioGithubResilience4j" }
ioGrpcGrpcNettyShaded = { module = "io.grpc:grpc-netty-shaded", version.ref = "ioGrpc" }
ioGrpcGrpcStub = { module = "io.grpc:grpc-stub", version.ref = "ioGrpc" }
# The following dependency is required for the OC-exporter to work correctly and must be matched against the grpc version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ public class AgentCommandSettings {
*/
private Duration pollingInterval;

/**
* The task timeout to use - the time to run one agent command task.
*/
private Duration taskTimeout;

/**
* How long the agent will stay in the live mode, before falling back to the normal mode.
*/
Expand All @@ -92,16 +87,14 @@ public class AgentCommandSettings {

@AssertFalse(message = "The specified time values should not be negative!")
public boolean isNegativeTimeout() {
boolean negativeTimeout = taskTimeout != null && taskTimeout.isNegative();
boolean negativeLiveConnectionTimeout = liveConnectionTimeout != null && liveConnectionTimeout.isNegative();
boolean negativeConnectionTimeout = connectionTimeout != null && connectionTimeout.isNegative();
boolean negativeLiveConnectionRequestTimeout = liveConnectionRequestTimeout != null && liveConnectionRequestTimeout.isNegative();
boolean negativeConnectionRequestTimeout = connectionRequestTimeout != null && connectionRequestTimeout.isNegative();
boolean negativeLiveSocketTimeout = liveSocketTimeout != null && liveSocketTimeout.isNegative();
boolean negativeSocketTimeout = socketTimeout != null && socketTimeout.isNegative();
boolean negativeTTL = timeToLive != null && timeToLive.isNegative();
return negativeTimeout ||
negativeLiveConnectionTimeout || negativeConnectionTimeout ||
return negativeLiveConnectionTimeout || negativeConnectionTimeout ||
negativeLiveConnectionRequestTimeout || negativeConnectionRequestTimeout ||
negativeLiveSocketTimeout || negativeSocketTimeout
|| negativeTTL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ public class HttpConfigSettings {
@NonNull
private Duration frequency;

/**
* The task timeout to use - the time to run one HTTP polling task.
*/
private Duration taskTimeout;

/**
* The connection timeout to use - the time to establish the connection with the remote host.
*/
Expand Down Expand Up @@ -81,13 +76,11 @@ public class HttpConfigSettings {

@AssertFalse(message = "The specified time values should not be negative!")
public boolean isNegativeTimeout() {
boolean negativeTimeout = taskTimeout != null && taskTimeout.isNegative();
boolean negativeConnectionTimeout = connectionTimeout != null && connectionTimeout.isNegative();
boolean negativeConnectionRequestTimeout = connectionRequestTimeout != null && connectionRequestTimeout.isNegative();
boolean negativeReadTimeout = socketTimeout != null && socketTimeout.isNegative();
boolean negativeTTL = timeToLive != null && timeToLive.isNegative();
return negativeTimeout ||
negativeConnectionTimeout || negativeConnectionRequestTimeout || negativeReadTimeout ||
return negativeConnectionTimeout || negativeConnectionRequestTimeout || negativeReadTimeout ||
negativeTTL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ public class RetrySettings {
@NotNull
// We use a BigDecimal as there is no support for double in hibernate validator
private BigDecimal randomizationFactor;

/**
* The maximum amount of time one retry is allowed to take. May not be lower than 1
*/
@DurationMin(millis = 0, inclusive = false)
private Duration timeLimit;
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ inspectit:
multiplier: 2
# This factor introduces randomness to what the actual wait interval will be. This prevents that a lot of agents will issue requests towards the configuration server at the same time.
randomization-factor: 0.1
# The maximum duration one retry may take
time-limit: 32m

log-preloading:
enabled: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ inspectit:
# true if retries are enabled, false otherwise
enabled: true
# The maximum number of attempts to try to fetch the configuration
max-attempts: 7
max-attempts: 6
# The initial interval to wait after the first failed attempt.
initial-interval: 30s
# For each retry the last interval to wait is multiplied with this number to calculate the next interval to wait
multiplier: 2
# This factor introduces randomness to what the actual wait interval will be. This prevents that a lot of agents will issue requests towards the configuration server at the same time.
randomization-factor: 0.1
# The maximum duration one retry may take
time-limit: 32m
1 change: 1 addition & 0 deletions inspectit-ocelot-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ dependencies {
libs.piccolo,
libs.comFasterxmlJacksonCoreJacksonDatabind,
libs.ioGithubResilience4jResilience4jRetry,
libs.ioGithubResilience4jResilience4jTimelimiter,

libs.orgJavassist,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
import org.springframework.stereotype.Service;
import rocks.inspectit.ocelot.config.model.InspectitConfig;
import rocks.inspectit.ocelot.config.model.command.AgentCommandSettings;
import rocks.inspectit.ocelot.core.config.propertysources.http.TaskTimeoutExecutor;
import rocks.inspectit.ocelot.core.service.DynamicallyActivatableService;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,24 +36,8 @@ public class AgentCommandService extends DynamicallyActivatableService implement
*/
private ScheduledFuture<?> handlerFuture;

/**
* The interval for the scheduled task.
*/
private Duration pollingInterval;

/**
* The executor to cancel the polling task by timeout. This should prevent the HTTP thread to deadlock.
*/
private final TaskTimeoutExecutor timeoutExecutor;

/**
* The maximum time to run one polling task.
*/
private Duration pollingTimeout;

public AgentCommandService() {
super("agentCommands");
timeoutExecutor = new TaskTimeoutExecutor();
}

@Override
Expand Down Expand Up @@ -87,17 +69,17 @@ protected boolean doEnable(InspectitConfig configuration) {
}

AgentCommandSettings settings = configuration.getAgentCommands();
pollingInterval = settings.getPollingInterval();
pollingTimeout = settings.getTaskTimeout();
startScheduledHandler();
long pollingIntervalMs = settings.getPollingInterval().toMillis();

handlerFuture = executor.scheduleWithFixedDelay(this, pollingIntervalMs, pollingIntervalMs, TimeUnit.MILLISECONDS);

return true;
}

@Override
protected boolean doDisable() {
log.info("Stopping agent command polling service.");
cancelTimeout();

if (handlerFuture != null) {
handlerFuture.cancel(true);
}
Expand All @@ -109,24 +91,11 @@ public void run() {
log.debug("Trying to fetch new agent commands.");
try {
commandHandler.nextCommand();
// After the command was fetched, the task should no longer timeout
cancelTimeout();
} catch (Exception exception) {
log.error("Error while fetching agent command.", exception);
}
}

/**
* Start the scheduled fetching of the next agent command.
*/
private void startScheduledHandler() {
handlerFuture = executor.scheduleWithFixedDelay(this,
pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
// Setup timeout for fetching a command
if (pollingTimeout != null && !pollingTimeout.isZero())
timeoutExecutor.scheduleCancelling(handlerFuture, "agentcommand", this::startScheduledHandler, pollingTimeout);
}

@VisibleForTesting
URI getCommandUri(InspectitConfig configuration) throws URISyntaxException {
AgentCommandSettings settings = configuration.getAgentCommands();
Expand All @@ -149,11 +118,4 @@ URI getCommandUri(InspectitConfig configuration) throws URISyntaxException {
return settings.getUrl().toURI();
}
}

@VisibleForTesting
void cancelTimeout() {
if (timeoutExecutor != null) {
timeoutExecutor.cancelTimeout();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.timelimiter.TimeLimiter;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand All @@ -17,6 +18,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Component which handles the fetching of new agent commands and execution of it.
Expand Down Expand Up @@ -56,7 +60,7 @@ public class CommandHandler {
/**
* Tries fetching and executing a new agent command from the server.
*/
public void nextCommand() {
public void nextCommand() throws Exception {
nextCommand(null);
}

Expand All @@ -67,7 +71,7 @@ public void nextCommand() {
*
* @param payload a {@link CommandResponse} to send with the next request
*/
private void nextCommand(CommandResponse payload) {
private void nextCommand(CommandResponse payload) throws Exception {
CommandResponse commandResponse = payload;

do {
Expand Down Expand Up @@ -101,10 +105,22 @@ private boolean isLiveModeExpired() {
return System.currentTimeMillis() >= liveModeStart + settings.getLiveModeDuration().toMillis();
}

private Command getCommandWithRetry(CommandResponse commandResponse) {
private Command getCommandWithRetry(CommandResponse commandResponse) throws Exception {
Retry retry = buildRetry();
if (retry != null) {
return retry.executeSupplier(() -> getCommand(commandResponse));
Callable<Command> getCommand;

TimeLimiter timeLimiter = buildTimeLimiter();
if(timeLimiter != null) {
ExecutorService timeLimitExecutor = Executors.newSingleThreadExecutor();
// Use time limiter for every function call
getCommand = timeLimiter.decorateFutureSupplier(() -> timeLimitExecutor.submit(() -> getCommand(commandResponse)));
}
else getCommand = () -> getCommand(commandResponse);

Command command = retry.executeCallable(getCommand);
return command;

} else {
return getCommand(commandResponse);
}
Expand All @@ -116,6 +132,12 @@ private Retry buildRetry() {
.getRetry(), "agent-commands");
}

private TimeLimiter buildTimeLimiter() {
return RetryUtils.buildTimeLimiter(environment.getCurrentConfig()
.getAgentCommands()
.getRetry(), "agent-commands");
}

/**
* Fetches a command and processes the response.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package rocks.inspectit.ocelot.core.config.propertysources.http;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -11,7 +10,6 @@
import rocks.inspectit.ocelot.core.config.InspectitEnvironment;
import rocks.inspectit.ocelot.core.service.DynamicallyActivatableService;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,21 +29,6 @@ public class HttpConfigurationPoller extends DynamicallyActivatableService imple
*/
private ScheduledFuture<?> pollerFuture;

/**
* The interval for the scheduled task.
*/
private Duration pollingInterval;

/**
* The executor to cancel the polling task by timeout. This should prevent the HTTP thread to deadlock.
*/
private final TaskTimeoutExecutor timeoutExecutor;

/**
* The maximum time to run one polling task.
*/
private Duration pollingTimeout;

/**
* The state of the used HTTP property source configuration.
*/
Expand All @@ -54,7 +37,6 @@ public class HttpConfigurationPoller extends DynamicallyActivatableService imple

public HttpConfigurationPoller() {
super("config.http");
timeoutExecutor = new TaskTimeoutExecutor();
}

@Override
Expand All @@ -70,17 +52,15 @@ protected boolean doEnable(InspectitConfig configuration) {

currentState = new HttpPropertySourceState(InspectitEnvironment.HTTP_BASED_CONFIGURATION, httpSettings);

pollingInterval = httpSettings.getFrequency();
pollingTimeout = httpSettings.getTaskTimeout();
startScheduledPolling();
long frequencyMs = httpSettings.getFrequency().toMillis();
pollerFuture = executor.scheduleWithFixedDelay(this, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS);

return true;
}

@Override
protected boolean doDisable() {
log.info("Stopping HTTP configuration polling service.");
cancelTimeout();
if (pollerFuture != null) {
pollerFuture.cancel(true);
}
Expand All @@ -94,10 +74,7 @@ protected boolean doDisable() {
@Override
public void run() {
log.debug("Updating HTTP property source.");
// Fetch configuration
boolean wasUpdated = currentState.update(false);
// After the configuration was fetched, the task should no longer timeout
cancelTimeout();
if (wasUpdated) {
env.updatePropertySources(propertySources -> {
if (propertySources.contains(InspectitEnvironment.HTTP_BASED_CONFIGURATION)) {
Expand All @@ -107,17 +84,6 @@ public void run() {
}
}

/**
* Start the scheduled HTTP polling.
*/
private void startScheduledPolling() {
pollerFuture = executor.scheduleWithFixedDelay(this,
pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
// Setup timeout for fetching the configuration
if (pollingTimeout != null && !pollingTimeout.isZero())
timeoutExecutor.scheduleCancelling(pollerFuture, "http.config", this::startScheduledPolling, pollingTimeout);
}

public void updateAgentHealthState(AgentHealthState agentHealth) {
if (currentState != null) {
currentState.updateAgentHealthState(agentHealth);
Expand All @@ -128,11 +94,4 @@ public AgentHealthState getCurrentAgentHealthState() {
if(currentState == null) return null;
return currentState.getAgentHealth();
}

@VisibleForTesting
void cancelTimeout() {
if (timeoutExecutor != null) {
timeoutExecutor.cancelTimeout();
}
}
}
Loading

0 comments on commit afe5745

Please sign in to comment.