From afe57454e7ce7378b968e7587d97611a11f9de01 Mon Sep 17 00:00:00 2001 From: EddeCCC Date: Tue, 17 Dec 2024 10:58:08 +0100 Subject: [PATCH] replace task timeout with resilience4j timelimiter --- gradle/libs.versions.toml | 6 +- .../model/command/AgentCommandSettings.java | 9 +- .../model/config/HttpConfigSettings.java | 9 +- .../config/model/config/RetrySettings.java | 6 ++ .../ocelot/config/default/basics.yml | 2 + .../ocelot/config/default/config-sources.yml | 4 +- inspectit-ocelot-core/build.gradle | 1 + .../core/command/AgentCommandService.java | 46 +------- .../ocelot/core/command/CommandHandler.java | 30 +++++- .../http/HttpConfigurationPoller.java | 45 +------- .../http/HttpPropertySourceState.java | 54 ++++++++-- .../http/TaskTimeoutExecutor.java | 57 ---------- .../ocelot/core/utils/RetryUtils.java | 19 +++- .../core/command/AgentCommandServiceTest.java | 54 ---------- .../core/command/CommandHandlerTest.java | 10 +- .../http/HttpConfigurationPollerTest.java | 62 ----------- .../http/TaskTimeoutExecutorTest.java | 82 -------------- .../ocelot/core/utils/RetryUtilsTest.java | 100 +++++++++++++----- 18 files changed, 191 insertions(+), 405 deletions(-) delete mode 100644 inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutor.java delete mode 100644 inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutorTest.java diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9c7cc713ea..0909d55e6c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -21,6 +21,8 @@ orgSpringframework = "5.3.39" # @pin 2.7.18 is the latest release which runs on Java 8, this version marks the en of open source support for Sb 2.x orgSpringframeworkBoot = "2.7.18" orgTestcontainers = "1.20.3" +# @pin resilience4j 2.* uses Java 17 APIs +ioGithubResilience4j = "1.7.1" [libraries] # @pin 1.3.* are the latest versions, which support java 8 @@ -42,8 +44,8 @@ commonsBeanutils = "commons-beanutils:commons-beanutils:1.9.4" commonsIo = "commons-io:commons-io:2.17.0" ioGithubNetmikeyLogunitLogunitCore = { module = "io.github.netmikey.logunit:logunit-core", version.ref = "ioGithubNetmikeyLogunit" } ioGithubNetmikeyLogunitLogunitLogback = { module = "io.github.netmikey.logunit:logunit-logback", version.ref = "ioGithubNetmikeyLogunit" } -# @pin resilience4j 2.* uses Java 17 APIs -ioGithubResilience4jResilience4jRetry = "io.github.resilience4j:resilience4j-retry:1.7.1" +ioGithubResilience4jResilience4jRetry = { module = "io.github.resilience4j:resilience4j-retry", version.ref = "ioGithubResilience4j" } +ioGithubResilience4jResilience4jTimelimiter = { module = "io.github.resilience4j:resilience4j-timelimiter", version.ref = "ioGithubResilience4j" } ioGrpcGrpcNettyShaded = { module = "io.grpc:grpc-netty-shaded", version.ref = "ioGrpc" } ioGrpcGrpcStub = { module = "io.grpc:grpc-stub", version.ref = "ioGrpc" } # The following dependency is required for the OC-exporter to work correctly and must be matched against the grpc version diff --git a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/command/AgentCommandSettings.java b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/command/AgentCommandSettings.java index 288e736850..2c56e62313 100644 --- a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/command/AgentCommandSettings.java +++ b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/command/AgentCommandSettings.java @@ -74,11 +74,6 @@ public class AgentCommandSettings { */ private Duration pollingInterval; - /** - * The task timeout to use - the time to run one agent command task. - */ - private Duration taskTimeout; - /** * How long the agent will stay in the live mode, before falling back to the normal mode. */ @@ -92,7 +87,6 @@ public class AgentCommandSettings { @AssertFalse(message = "The specified time values should not be negative!") public boolean isNegativeTimeout() { - boolean negativeTimeout = taskTimeout != null && taskTimeout.isNegative(); boolean negativeLiveConnectionTimeout = liveConnectionTimeout != null && liveConnectionTimeout.isNegative(); boolean negativeConnectionTimeout = connectionTimeout != null && connectionTimeout.isNegative(); boolean negativeLiveConnectionRequestTimeout = liveConnectionRequestTimeout != null && liveConnectionRequestTimeout.isNegative(); @@ -100,8 +94,7 @@ public boolean isNegativeTimeout() { boolean negativeLiveSocketTimeout = liveSocketTimeout != null && liveSocketTimeout.isNegative(); boolean negativeSocketTimeout = socketTimeout != null && socketTimeout.isNegative(); boolean negativeTTL = timeToLive != null && timeToLive.isNegative(); - return negativeTimeout || - negativeLiveConnectionTimeout || negativeConnectionTimeout || + return negativeLiveConnectionTimeout || negativeConnectionTimeout || negativeLiveConnectionRequestTimeout || negativeConnectionRequestTimeout || negativeLiveSocketTimeout || negativeSocketTimeout || negativeTTL; diff --git a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/HttpConfigSettings.java b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/HttpConfigSettings.java index f969a0b393..887edd7701 100644 --- a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/HttpConfigSettings.java +++ b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/HttpConfigSettings.java @@ -48,11 +48,6 @@ public class HttpConfigSettings { @NonNull private Duration frequency; - /** - * The task timeout to use - the time to run one HTTP polling task. - */ - private Duration taskTimeout; - /** * The connection timeout to use - the time to establish the connection with the remote host. */ @@ -81,13 +76,11 @@ public class HttpConfigSettings { @AssertFalse(message = "The specified time values should not be negative!") public boolean isNegativeTimeout() { - boolean negativeTimeout = taskTimeout != null && taskTimeout.isNegative(); boolean negativeConnectionTimeout = connectionTimeout != null && connectionTimeout.isNegative(); boolean negativeConnectionRequestTimeout = connectionRequestTimeout != null && connectionRequestTimeout.isNegative(); boolean negativeReadTimeout = socketTimeout != null && socketTimeout.isNegative(); boolean negativeTTL = timeToLive != null && timeToLive.isNegative(); - return negativeTimeout || - negativeConnectionTimeout || negativeConnectionRequestTimeout || negativeReadTimeout || + return negativeConnectionTimeout || negativeConnectionRequestTimeout || negativeReadTimeout || negativeTTL; } } diff --git a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/RetrySettings.java b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/RetrySettings.java index f6f2e49f30..7ae66073e3 100644 --- a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/RetrySettings.java +++ b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/config/RetrySettings.java @@ -42,4 +42,10 @@ public class RetrySettings { @NotNull // We use a BigDecimal as there is no support for double in hibernate validator private BigDecimal randomizationFactor; + + /** + * The maximum amount of time one retry is allowed to take. May not be lower than 1 + */ + @DurationMin(millis = 0, inclusive = false) + private Duration timeLimit; } diff --git a/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/basics.yml b/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/basics.yml index 4e82d5ae20..2e28b0c997 100644 --- a/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/basics.yml +++ b/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/basics.yml @@ -151,6 +151,8 @@ inspectit: multiplier: 2 # This factor introduces randomness to what the actual wait interval will be. This prevents that a lot of agents will issue requests towards the configuration server at the same time. randomization-factor: 0.1 + # The maximum duration one retry may take + time-limit: 32m log-preloading: enabled: false diff --git a/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/config-sources.yml b/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/config-sources.yml index 8c52c0ef4e..80c5907ef3 100644 --- a/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/config-sources.yml +++ b/inspectit-ocelot-config/src/main/resources/rocks/inspectit/ocelot/config/default/config-sources.yml @@ -35,10 +35,12 @@ inspectit: # true if retries are enabled, false otherwise enabled: true # The maximum number of attempts to try to fetch the configuration - max-attempts: 7 + max-attempts: 6 # The initial interval to wait after the first failed attempt. initial-interval: 30s # For each retry the last interval to wait is multiplied with this number to calculate the next interval to wait multiplier: 2 # This factor introduces randomness to what the actual wait interval will be. This prevents that a lot of agents will issue requests towards the configuration server at the same time. randomization-factor: 0.1 + # The maximum duration one retry may take + time-limit: 32m diff --git a/inspectit-ocelot-core/build.gradle b/inspectit-ocelot-core/build.gradle index 9f116d4bba..54c36043d5 100644 --- a/inspectit-ocelot-core/build.gradle +++ b/inspectit-ocelot-core/build.gradle @@ -87,6 +87,7 @@ dependencies { libs.piccolo, libs.comFasterxmlJacksonCoreJacksonDatabind, libs.ioGithubResilience4jResilience4jRetry, + libs.ioGithubResilience4jResilience4jTimelimiter, libs.orgJavassist, diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/AgentCommandService.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/AgentCommandService.java index 3ed2dedcea..361bcef548 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/AgentCommandService.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/AgentCommandService.java @@ -6,13 +6,11 @@ import org.springframework.stereotype.Service; import rocks.inspectit.ocelot.config.model.InspectitConfig; import rocks.inspectit.ocelot.config.model.command.AgentCommandSettings; -import rocks.inspectit.ocelot.core.config.propertysources.http.TaskTimeoutExecutor; import rocks.inspectit.ocelot.core.service.DynamicallyActivatableService; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; -import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -38,24 +36,8 @@ public class AgentCommandService extends DynamicallyActivatableService implement */ private ScheduledFuture handlerFuture; - /** - * The interval for the scheduled task. - */ - private Duration pollingInterval; - - /** - * The executor to cancel the polling task by timeout. This should prevent the HTTP thread to deadlock. - */ - private final TaskTimeoutExecutor timeoutExecutor; - - /** - * The maximum time to run one polling task. - */ - private Duration pollingTimeout; - public AgentCommandService() { super("agentCommands"); - timeoutExecutor = new TaskTimeoutExecutor(); } @Override @@ -87,9 +69,9 @@ protected boolean doEnable(InspectitConfig configuration) { } AgentCommandSettings settings = configuration.getAgentCommands(); - pollingInterval = settings.getPollingInterval(); - pollingTimeout = settings.getTaskTimeout(); - startScheduledHandler(); + long pollingIntervalMs = settings.getPollingInterval().toMillis(); + + handlerFuture = executor.scheduleWithFixedDelay(this, pollingIntervalMs, pollingIntervalMs, TimeUnit.MILLISECONDS); return true; } @@ -97,7 +79,7 @@ protected boolean doEnable(InspectitConfig configuration) { @Override protected boolean doDisable() { log.info("Stopping agent command polling service."); - cancelTimeout(); + if (handlerFuture != null) { handlerFuture.cancel(true); } @@ -109,24 +91,11 @@ public void run() { log.debug("Trying to fetch new agent commands."); try { commandHandler.nextCommand(); - // After the command was fetched, the task should no longer timeout - cancelTimeout(); } catch (Exception exception) { log.error("Error while fetching agent command.", exception); } } - /** - * Start the scheduled fetching of the next agent command. - */ - private void startScheduledHandler() { - handlerFuture = executor.scheduleWithFixedDelay(this, - pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS); - // Setup timeout for fetching a command - if (pollingTimeout != null && !pollingTimeout.isZero()) - timeoutExecutor.scheduleCancelling(handlerFuture, "agentcommand", this::startScheduledHandler, pollingTimeout); - } - @VisibleForTesting URI getCommandUri(InspectitConfig configuration) throws URISyntaxException { AgentCommandSettings settings = configuration.getAgentCommands(); @@ -149,11 +118,4 @@ URI getCommandUri(InspectitConfig configuration) throws URISyntaxException { return settings.getUrl().toURI(); } } - - @VisibleForTesting - void cancelTimeout() { - if (timeoutExecutor != null) { - timeoutExecutor.cancelTimeout(); - } - } } diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/CommandHandler.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/CommandHandler.java index e5fc9a8539..f6b73da0c6 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/CommandHandler.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/command/CommandHandler.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.timelimiter.TimeLimiter; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -17,6 +18,9 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Component which handles the fetching of new agent commands and execution of it. @@ -56,7 +60,7 @@ public class CommandHandler { /** * Tries fetching and executing a new agent command from the server. */ - public void nextCommand() { + public void nextCommand() throws Exception { nextCommand(null); } @@ -67,7 +71,7 @@ public void nextCommand() { * * @param payload a {@link CommandResponse} to send with the next request */ - private void nextCommand(CommandResponse payload) { + private void nextCommand(CommandResponse payload) throws Exception { CommandResponse commandResponse = payload; do { @@ -101,10 +105,22 @@ private boolean isLiveModeExpired() { return System.currentTimeMillis() >= liveModeStart + settings.getLiveModeDuration().toMillis(); } - private Command getCommandWithRetry(CommandResponse commandResponse) { + private Command getCommandWithRetry(CommandResponse commandResponse) throws Exception { Retry retry = buildRetry(); if (retry != null) { - return retry.executeSupplier(() -> getCommand(commandResponse)); + Callable getCommand; + + TimeLimiter timeLimiter = buildTimeLimiter(); + if(timeLimiter != null) { + ExecutorService timeLimitExecutor = Executors.newSingleThreadExecutor(); + // Use time limiter for every function call + getCommand = timeLimiter.decorateFutureSupplier(() -> timeLimitExecutor.submit(() -> getCommand(commandResponse))); + } + else getCommand = () -> getCommand(commandResponse); + + Command command = retry.executeCallable(getCommand); + return command; + } else { return getCommand(commandResponse); } @@ -116,6 +132,12 @@ private Retry buildRetry() { .getRetry(), "agent-commands"); } + private TimeLimiter buildTimeLimiter() { + return RetryUtils.buildTimeLimiter(environment.getCurrentConfig() + .getAgentCommands() + .getRetry(), "agent-commands"); + } + /** * Fetches a command and processes the response. * diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPoller.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPoller.java index 69dbc512b7..212c0a6a1c 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPoller.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPoller.java @@ -1,6 +1,5 @@ package rocks.inspectit.ocelot.core.config.propertysources.http; -import com.google.common.annotations.VisibleForTesting; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -11,7 +10,6 @@ import rocks.inspectit.ocelot.core.config.InspectitEnvironment; import rocks.inspectit.ocelot.core.service.DynamicallyActivatableService; -import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -31,21 +29,6 @@ public class HttpConfigurationPoller extends DynamicallyActivatableService imple */ private ScheduledFuture pollerFuture; - /** - * The interval for the scheduled task. - */ - private Duration pollingInterval; - - /** - * The executor to cancel the polling task by timeout. This should prevent the HTTP thread to deadlock. - */ - private final TaskTimeoutExecutor timeoutExecutor; - - /** - * The maximum time to run one polling task. - */ - private Duration pollingTimeout; - /** * The state of the used HTTP property source configuration. */ @@ -54,7 +37,6 @@ public class HttpConfigurationPoller extends DynamicallyActivatableService imple public HttpConfigurationPoller() { super("config.http"); - timeoutExecutor = new TaskTimeoutExecutor(); } @Override @@ -70,9 +52,8 @@ protected boolean doEnable(InspectitConfig configuration) { currentState = new HttpPropertySourceState(InspectitEnvironment.HTTP_BASED_CONFIGURATION, httpSettings); - pollingInterval = httpSettings.getFrequency(); - pollingTimeout = httpSettings.getTaskTimeout(); - startScheduledPolling(); + long frequencyMs = httpSettings.getFrequency().toMillis(); + pollerFuture = executor.scheduleWithFixedDelay(this, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS); return true; } @@ -80,7 +61,6 @@ protected boolean doEnable(InspectitConfig configuration) { @Override protected boolean doDisable() { log.info("Stopping HTTP configuration polling service."); - cancelTimeout(); if (pollerFuture != null) { pollerFuture.cancel(true); } @@ -94,10 +74,7 @@ protected boolean doDisable() { @Override public void run() { log.debug("Updating HTTP property source."); - // Fetch configuration boolean wasUpdated = currentState.update(false); - // After the configuration was fetched, the task should no longer timeout - cancelTimeout(); if (wasUpdated) { env.updatePropertySources(propertySources -> { if (propertySources.contains(InspectitEnvironment.HTTP_BASED_CONFIGURATION)) { @@ -107,17 +84,6 @@ public void run() { } } - /** - * Start the scheduled HTTP polling. - */ - private void startScheduledPolling() { - pollerFuture = executor.scheduleWithFixedDelay(this, - pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS); - // Setup timeout for fetching the configuration - if (pollingTimeout != null && !pollingTimeout.isZero()) - timeoutExecutor.scheduleCancelling(pollerFuture, "http.config", this::startScheduledPolling, pollingTimeout); - } - public void updateAgentHealthState(AgentHealthState agentHealth) { if (currentState != null) { currentState.updateAgentHealthState(agentHealth); @@ -128,11 +94,4 @@ public AgentHealthState getCurrentAgentHealthState() { if(currentState == null) return null; return currentState.getAgentHealth(); } - - @VisibleForTesting - void cancelTimeout() { - if (timeoutExecutor != null) { - timeoutExecutor.cancelTimeout(); - } - } } diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpPropertySourceState.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpPropertySourceState.java index 9243fd81d6..23860a66aa 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpPropertySourceState.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpPropertySourceState.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import io.github.resilience4j.timelimiter.TimeLimiter; import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,7 @@ import java.nio.file.attribute.FileTime; import java.util.Date; import java.util.Properties; +import java.util.concurrent.*; /** * Representing and managing the state of a HTTP-based agent configuration. @@ -199,7 +201,18 @@ private String fetchConfiguration(boolean fallBackToFile) { retry = buildRetry(); } if (retry != null) { - configuration = retry.executeCallable(() -> fetchConfiguration(httpClient, httpGet)); + Callable fetchConfiguration; + + TimeLimiter timeLimiter = buildTimeLimiter(); + if(timeLimiter != null) { + ExecutorService timeLimitExecutor = Executors.newSingleThreadExecutor(); + // Use time limiter for every function call + fetchConfiguration = timeLimiter.decorateFutureSupplier(() -> timeLimitExecutor.submit(fetchConfigurationCall(httpClient, httpGet))); + } + else fetchConfiguration = fetchConfigurationCall(httpClient, httpGet); + + // Execute function with retries + configuration = retry.executeCallable(fetchConfiguration); } else { configuration = fetchConfiguration(httpClient, httpGet); } @@ -208,9 +221,6 @@ private String fetchConfiguration(boolean fallBackToFile) { logFetchError("HTTP protocol error occurred while fetching configuration.", e); } catch (IOException e) { logFetchError("A IO problem occurred while fetching configuration.", e); - } catch (InterruptedException e) { - logFetchError("Thread was interrupted while fetching configuration.", e); - Thread.currentThread().interrupt(); } catch (Exception e) { logFetchError("Exception occurred while fetching configuration.", e); } @@ -230,6 +240,10 @@ private Retry buildRetry() { return RetryUtils.buildRetry(currentSettings.getRetry(), "http-property-source"); } + private TimeLimiter buildTimeLimiter() { + return RetryUtils.buildTimeLimiter(currentSettings.getRetry(), "http-property-source"); + } + private HttpGet buildRequest() throws URISyntaxException { URI uri = getEffectiveRequestUri(); log.debug("Updating configuration via HTTP from URL: {}", uri.toString()); @@ -241,21 +255,39 @@ private HttpGet buildRequest() throws URISyntaxException { if (latestETag != null) { httpGet.setHeader("If-None-Match", latestETag); } - setAgentMetaHeaders(httpGet); return httpGet; } + /** + * Wrapper for the method {@link #fetchConfiguration(HttpClient, HttpGet)}. + */ + private Callable fetchConfigurationCall(CloseableHttpClient httpClient, HttpGet httpGet) { + return () -> { + try { + return fetchConfiguration(httpClient, httpGet); + } catch (IOException e) { + throw new CouldNotFetchConfigurationException(e); + } + }; + } + + /** + * Execute HTTP request and read configuration + * + * @param client the client to execute the request + * @param request the request + * @return the fetched configuration string + */ private String fetchConfiguration(HttpClient client, HttpGet request) throws IOException { HttpResponse response = client.execute(request); - // get the config from the response + // Get the config from the response String configuration = processHttpResponse(response); if (errorCounter != 0) { log.info("Configuration fetch has been successful after {} unsuccessful attempts.", errorCounter); errorCounter = 0; } - return configuration; } @@ -405,4 +437,12 @@ private String readPersistenceFile() { return null; } + /** + * Exception for {@link #fetchConfigurationCall(CloseableHttpClient, HttpGet)} + */ + private static class CouldNotFetchConfigurationException extends RuntimeException { + CouldNotFetchConfigurationException(Exception e) { + super("Could not fetch HTTP configuration", e); + } + } } diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutor.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutor.java deleted file mode 100644 index fd6cb78b0e..0000000000 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutor.java +++ /dev/null @@ -1,57 +0,0 @@ -package rocks.inspectit.ocelot.core.config.propertysources.http; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import lombok.extern.slf4j.Slf4j; - -import java.time.Duration; -import java.util.concurrent.*; - -/** - * Cancels internal tasks after a timeout has been exceeded. - */ -@Slf4j -public class TaskTimeoutExecutor { - - /** - * The future to cancel the task by timeout. - */ - private Future timeoutExecutor; - - /** - * Cancel the task after a specific timeout. This should prevent, that the thread stays deadlocked. - * The task will be restarted after successful cancel. - * - * @param task the task to cancel by timeout - * @param taskName the name of the task - * @param restartTask the runnable to restart the task - * @param timeout the time after which the task should be cancelled - */ - public void scheduleCancelling(Future task, String taskName, Runnable restartTask, Duration timeout) { - ThreadFactory factory = new ThreadFactoryBuilder() - .setNameFormat("timeout-" + taskName) - .build(); - ScheduledExecutorService cancelExecutor = Executors.newSingleThreadScheduledExecutor(factory); - - // Execute when timeout is reached - Runnable cancelRunnable = () -> { - task.cancel(true); - boolean isCancelled = task.isCancelled(); - log.warn("Cancelled {}: {}", taskName, isCancelled); - if (isCancelled) { - log.info("Restarting {}...", taskName); - restartTask.run(); - } - }; - - // Schedule the cancelling just once - log.debug("Scheduling {} timeout with: {}", taskName, timeout); - timeoutExecutor = cancelExecutor.schedule(cancelRunnable, timeout.toMillis(), TimeUnit.MILLISECONDS); - } - - /** - * Cancel the timeout executor, if started - */ - public void cancelTimeout() { - if(timeoutExecutor != null) timeoutExecutor.cancel(true); - } -} diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/RetryUtils.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/RetryUtils.java index 2b2cd576c2..62844b2850 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/RetryUtils.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/RetryUtils.java @@ -3,12 +3,12 @@ import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.timelimiter.TimeLimiter; +import io.github.resilience4j.timelimiter.TimeLimiterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rocks.inspectit.ocelot.config.model.config.RetrySettings; -import java.time.temporal.ChronoUnit; - public final class RetryUtils { private static final Logger LOGGER = LoggerFactory.getLogger(RetryUtils.class); @@ -28,9 +28,22 @@ public static Retry buildRetry(RetrySettings retrySettings, String retryName) { retrySettings.getRandomizationFactor().doubleValue())) .build(); Retry retry = Retry.of(retryName, retryConfig); - retry.getEventPublisher().onRetry( event -> LOGGER.info("Retrying for {} in {}.", retryName, event.getWaitInterval())); + retry.getEventPublisher().onRetry(event -> LOGGER.info("Retrying for {} in {}.", retryName, event.getWaitInterval())); return retry; } return null; } + + public static TimeLimiter buildTimeLimiter(RetrySettings retrySettings, String timeLimiterName) { + if(retrySettings != null && retrySettings.isEnabled()) { + TimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.custom() + .cancelRunningFuture(true) + .timeoutDuration(retrySettings.getTimeLimit()) + .build(); + TimeLimiter timeLimiter = TimeLimiter.of(timeLimiterName, timeLimiterConfig); + timeLimiter.getEventPublisher().onTimeout(event -> LOGGER.info("Time limit for {} was exceeded.", timeLimiterName)); + return timeLimiter; + } + return null; + } } diff --git a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/AgentCommandServiceTest.java b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/AgentCommandServiceTest.java index 14d63c3f7c..8db0398d44 100644 --- a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/AgentCommandServiceTest.java +++ b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/AgentCommandServiceTest.java @@ -1,6 +1,5 @@ package rocks.inspectit.ocelot.core.command; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -145,57 +144,4 @@ public void verifyPrioritization() throws Exception { assertThat(result.toString()).isEqualTo("http://example.org/api/command"); } } - - @Nested - public class TaskTimeout { - - @AfterEach - void cancelTimeout() { - service.cancelTimeout(); - } - - @Test - void shouldCancelFutureAndRestartWhenTimeoutExceeded() throws MalformedURLException { - Duration timeout = Duration.ofMillis(500); - when(configuration.getAgentCommands().getPollingInterval()).thenReturn(Duration.ofSeconds(5)); - when(configuration.getAgentCommands().getTaskTimeout()).thenReturn(timeout); - when(configuration.getAgentCommands().getUrl()).thenReturn(new URL("http://example.org")); - ScheduledFuture future = mock(ScheduledFuture.class); - when(future.isCancelled()).thenReturn(true); - when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); - - service.doEnable(configuration); - - verify(future, timeout(timeout.toMillis() + 100)).cancel(true); - verify(executor, times(2)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - } - - @Test - void shouldNotCancelFutureWhenNoTimeout() throws MalformedURLException { - Duration timeout = Duration.ofMillis(5000); - when(configuration.getAgentCommands().getPollingInterval()).thenReturn(Duration.ofMillis(500)); - when(configuration.getAgentCommands().getTaskTimeout()).thenReturn(timeout); - when(configuration.getAgentCommands().getUrl()).thenReturn(new URL("http://example.org")); - ScheduledFuture future = mock(ScheduledFuture.class); - when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); - - service.doEnable(configuration); - - verify(future, never()).cancel(true); - verify(executor, times(1)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - } - - @Test - void shouldNotCancelFutureWhenTimeoutIsZero() throws MalformedURLException { - when(configuration.getAgentCommands().getPollingInterval()).thenReturn(Duration.ofMillis(500)); - when(configuration.getAgentCommands().getUrl()).thenReturn(new URL("http://example.org")); - ScheduledFuture future = mock(ScheduledFuture.class); - when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); - - service.doEnable(configuration); - - verify(future, never()).cancel(true); - verify(executor, times(1)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - } - } } diff --git a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/CommandHandlerTest.java b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/CommandHandlerTest.java index 6f49b2b20d..e8491ff62e 100644 --- a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/CommandHandlerTest.java +++ b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/command/CommandHandlerTest.java @@ -69,7 +69,7 @@ public void beforeEach() { public class NextCommand { @Test - public void noCommand() throws IOException { + public void noCommand() throws Exception { when(httpResponse.getStatusLine().getStatusCode()).thenReturn(HttpStatus.SC_NO_CONTENT); when(commandFetcher.fetchCommand(any(), anyBoolean())).thenReturn(httpResponse); @@ -79,7 +79,7 @@ public void noCommand() throws IOException { } @Test - public void pingCommandSend() throws IOException { + public void pingCommandSend() throws Exception { environment.getCurrentConfig().getAgentCommands().setLiveModeDuration(Duration.ZERO); PingCommand command = new PingCommand(); PingCommand.Response pingResponse = new PingCommand.Response(); @@ -117,14 +117,14 @@ void setup() { } @Test - void succeedsIfFirstCommandHandlerCallSucceeds() throws IOException { + void succeedsIfFirstCommandHandlerCallSucceeds() throws Exception { handler.nextCommand(); verify(commandFetcher).fetchCommand(any(), anyBoolean()); } @Test - void retriesOnCommandFetcherException() throws IOException { + void retriesOnCommandFetcherException() throws Exception { when(commandFetcher.fetchCommand(any(), anyBoolean())) .thenThrow(IOException.class) .thenReturn(successfulResponse); @@ -135,7 +135,7 @@ void retriesOnCommandFetcherException() throws IOException { } @Test - void retriesOnUnsuccessfulHttpResponse() throws IOException { + void retriesOnUnsuccessfulHttpResponse() throws Exception { when(commandFetcher.fetchCommand(any(), anyBoolean())) .thenReturn(unsuccessfulResponse) .thenReturn(successfulResponse); diff --git a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPollerTest.java b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPollerTest.java index f8142366d4..f495b9619a 100644 --- a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPollerTest.java +++ b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/HttpConfigurationPollerTest.java @@ -1,6 +1,5 @@ package rocks.inspectit.ocelot.core.config.propertysources.http; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -44,7 +43,6 @@ public void successfullyEnabled() { configuration.setConfig(new ConfigSettings()); configuration.getConfig().setHttp(new HttpConfigSettings()); configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(5000L)); - configuration.getConfig().getHttp().setTaskTimeout(Duration.ofMillis(50000L)); ScheduledFuture future = Mockito.mock(ScheduledFuture.class); when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); @@ -73,7 +71,6 @@ public void isEnabled() { configuration.setConfig(new ConfigSettings()); configuration.getConfig().setHttp(new HttpConfigSettings()); configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(5000L)); - configuration.getConfig().getHttp().setTaskTimeout(Duration.ofMillis(50000L)); ScheduledFuture future = Mockito.mock(ScheduledFuture.class); when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); @@ -120,63 +117,4 @@ public void stateUpdated() { verifyNoMoreInteractions(currentState, env); } } - - @Nested - public class TaskTimeout { - - @AfterEach - void cancelTimeout() { - poller.cancelTimeout(); - } - - @Test - void shouldCancelFutureAndRestartWhenTimeoutExceeded() { - Duration timeout = Duration.ofMillis(500); - InspectitConfig configuration = new InspectitConfig(); - configuration.setConfig(new ConfigSettings()); - configuration.getConfig().setHttp(new HttpConfigSettings()); - configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(5000)); - configuration.getConfig().getHttp().setTaskTimeout(timeout); - ScheduledFuture future = Mockito.mock(ScheduledFuture.class); - when(future.isCancelled()).thenReturn(true); - when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); - - poller.doEnable(configuration); - - verify(future, timeout(timeout.toMillis() + 100)).cancel(true); - verify(executor, times(2)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - } - - @Test - void shouldNotCancelFutureWhenNoTimeout() { - Duration timeout = Duration.ofMillis(5000); - InspectitConfig configuration = new InspectitConfig(); - configuration.setConfig(new ConfigSettings()); - configuration.getConfig().setHttp(new HttpConfigSettings()); - configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(500)); - configuration.getConfig().getHttp().setTaskTimeout(timeout); - ScheduledFuture future = Mockito.mock(ScheduledFuture.class); - when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); - - poller.doEnable(configuration); - - verify(future, never()).cancel(true); - verify(executor, times(1)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - } - - @Test - void shouldNotCancelFutureWhenTimeoutIsZero() { - InspectitConfig configuration = new InspectitConfig(); - configuration.setConfig(new ConfigSettings()); - configuration.getConfig().setHttp(new HttpConfigSettings()); - configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(500)); - ScheduledFuture future = Mockito.mock(ScheduledFuture.class); - when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future); - - poller.doEnable(configuration); - - verify(future, never()).cancel(true); - verify(executor, times(1)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); - } - } } diff --git a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutorTest.java b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutorTest.java deleted file mode 100644 index 8c04a49901..0000000000 --- a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/config/propertysources/http/TaskTimeoutExecutorTest.java +++ /dev/null @@ -1,82 +0,0 @@ -package rocks.inspectit.ocelot.core.config.propertysources.http; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.time.Duration; -import java.util.concurrent.*; -import static org.mockito.Mockito.*; -import static org.junit.jupiter.api.Assertions.*; - -@ExtendWith(MockitoExtension.class) -public class TaskTimeoutExecutorTest { - - private TaskTimeoutExecutor timeoutExecutor; - - private Future task; - - @Mock - private Runnable restartTask; - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - - private final Duration timeout = Duration.ofMillis(1000); - - @BeforeEach - void beforeEach() { - timeoutExecutor = new TaskTimeoutExecutor(); - } - - @AfterEach - void afterEach() { - task.cancel(true); - } - - @Test - void shouldCancelTaskAndRestartWhenTimeoutExceeded() throws InterruptedException { - // Arrange - Prepare task - Runnable endlessLoop = () -> { - while(true) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }; - task = executor.schedule(endlessLoop, 500, TimeUnit.MILLISECONDS); - - // Act - Setup timeout executor for cancelling - timeoutExecutor.scheduleCancelling(task, "test", restartTask, timeout); - // Wait for timeout to elapse - Thread.sleep(timeout.toMillis() + 100); - - // Assert - assertTrue(task.isCancelled()); - verify(restartTask, times(1)).run(); - } - - @Test - void shouldNotCancelTaskWhenNoTimeout() throws InterruptedException { - // Arrange - Prepare task - Runnable worker = () -> { - System.out.println("Work done"); - // Act 2 - Cancel timeout after work is done - timeoutExecutor.cancelTimeout(); - }; - task = executor.schedule(worker, 500, TimeUnit.MILLISECONDS); - - // Act 1 - Setup timeout executor for cancelling - timeoutExecutor.scheduleCancelling(task, "test", restartTask, timeout); - // Wait for timeout to elapse - Thread.sleep(timeout.toMillis() + 100); - - // Assert - assertFalse(task.isCancelled()); - verifyNoInteractions(restartTask); - } -} diff --git a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/utils/RetryUtilsTest.java b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/utils/RetryUtilsTest.java index ccd5b2ad68..d161014d96 100644 --- a/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/utils/RetryUtilsTest.java +++ b/inspectit-ocelot-core/src/test/java/rocks/inspectit/ocelot/core/utils/RetryUtilsTest.java @@ -1,7 +1,8 @@ package rocks.inspectit.ocelot.core.utils; import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.timelimiter.TimeLimiter; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import rocks.inspectit.ocelot.config.model.config.RetrySettings; @@ -12,41 +13,86 @@ class RetryUtilsTest { - @Test - void returnsRetryIfEnabled() { - RetrySettings retrySettings = new RetrySettings(); - retrySettings.setEnabled(true); - retrySettings.setMaxAttempts(5); - retrySettings.setInitialInterval(Duration.ofMillis(5000)); - retrySettings.setMultiplier(BigDecimal.TEN); - retrySettings.setRandomizationFactor(BigDecimal.valueOf(0.5)); + @Nested + class RetryTest { - Retry retry = RetryUtils.buildRetry(retrySettings, "anyName"); + @Test + void returnsRetryIfEnabled() { + RetrySettings retrySettings = new RetrySettings(); + retrySettings.setEnabled(true); + retrySettings.setMaxAttempts(5); + retrySettings.setInitialInterval(Duration.ofMillis(5000)); + retrySettings.setMultiplier(BigDecimal.TEN); + retrySettings.setRandomizationFactor(BigDecimal.valueOf(0.5)); - assertThat(retry).isNotNull(); - } + Retry retry = RetryUtils.buildRetry(retrySettings, "anyName"); - @Test - void returnsNullIfRetrySettingsIsNull() { - Retry retry = RetryUtils.buildRetry(null, "anyName"); + assertThat(retry).isNotNull(); + } - assertThat(retry).isNull(); - } + @Test + void returnsNullIfRetrySettingsIsNull() { + Retry retry = RetryUtils.buildRetry(null, "anyName"); + + assertThat(retry).isNull(); + } + + @Test + void returnsNullForDefaultRetrySettings() { + Retry retry = RetryUtils.buildRetry(new RetrySettings(), "anyName"); - @Test - void returnsNullForDefaultRetrySettings() { - Retry retry = RetryUtils.buildRetry(new RetrySettings(), "anyName"); + assertThat(retry).isNull(); + } - assertThat(retry).isNull(); + @Test + void returnsNullIfRetriesAreDisabled() { + RetrySettings retrySettings = new RetrySettings(); + retrySettings.setEnabled(false); + + Retry retry = RetryUtils.buildRetry(retrySettings, "anyName"); + + assertThat(retry).isNull(); + } } - @Test - void returnsNullIfRetriesAreDisabled() { - RetrySettings retrySettings = new RetrySettings(); - retrySettings.setEnabled(false); + @Nested + class TimeLimiterTest { + + @Test + void returnsTimeLimiterIfEnabled() { + Duration timeLimit = Duration.ofMillis(500); + RetrySettings retrySettings = new RetrySettings(); + retrySettings.setEnabled(true); + retrySettings.setTimeLimit(timeLimit); + + TimeLimiter timeLimiter = RetryUtils.buildTimeLimiter(retrySettings, "anyName"); + + assertThat(timeLimiter).isNotNull(); + assertThat(timeLimiter.getTimeLimiterConfig().getTimeoutDuration()).isEqualTo(timeLimit); + } + + @Test + void returnsNullIfRetrySettingsIsNull() { + TimeLimiter timeLimiter = RetryUtils.buildTimeLimiter(null, "anyName"); + + assertThat(timeLimiter).isNull(); + } + + @Test + void returnsNullForDefaultRetrySettings() { + TimeLimiter timeLimiter = RetryUtils.buildTimeLimiter(new RetrySettings(), "anyName"); + + assertThat(timeLimiter).isNull(); + } + + @Test + void returnsNullIfRetriesAreDisabled() { + RetrySettings retrySettings = new RetrySettings(); + retrySettings.setEnabled(false); - Retry retry = RetryUtils.buildRetry(retrySettings, "anyName"); + TimeLimiter timeLimiter = RetryUtils.buildTimeLimiter(retrySettings, "anyName"); - assertThat(retry).isNull(); + assertThat(timeLimiter).isNull(); + } } }