diff --git a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java
index d382368815..cf1c05518a 100644
--- a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java
+++ b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java
@@ -23,7 +23,7 @@ public class TagGuardSettings {
private Duration scheduleDelay;
/**
- * File, which contains metrics with their particular recorded tags and their tag values
+ * File, which contains measures with their particular recorded tags and their tag values
*/
private String databaseFile;
@@ -39,9 +39,10 @@ public class TagGuardSettings {
/**
* Map containing max values per tag by Measure, e.g., {{'method_duration': 1337}}
- *
max-values-per-tag-by-measure:
- * - method_duration: 1337
- * - http_in_responestime: 2000
+ *
+ * max-values-per-tag-by-measure:
+ * method_duration: 1337
+ * http_in_responestime: 2000
*/
@NotNull
private Map maxValuesPerTagByMeasure = Collections.emptyMap();
diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java
index 3c717b2d0c..f3023e82ce 100644
--- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java
+++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java
@@ -20,6 +20,7 @@
import rocks.inspectit.ocelot.commons.models.health.AgentHealth;
import rocks.inspectit.ocelot.config.model.InspectitConfig;
import rocks.inspectit.ocelot.config.model.metrics.TagGuardSettings;
+import rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings;
import rocks.inspectit.ocelot.core.config.InspectitEnvironment;
import rocks.inspectit.ocelot.core.instrumentation.context.InspectitContextImpl;
import rocks.inspectit.ocelot.core.instrumentation.hook.actions.IHookAction;
@@ -44,11 +45,8 @@
@Component
@Slf4j
public class MeasureTagValueGuard {
-
private static final String tagOverFlowMessageTemplate = "Overflow for tag %s";
- private static final String tagOverFlowResolvedMessageTemplate = "Overflow resolved for tag %s";
-
@Autowired
private InspectitEnvironment env;
@@ -62,14 +60,16 @@ public class MeasureTagValueGuard {
private CommonTagsManager commonTagsManager;
@Autowired
- private ScheduledExecutorService executorService;
+ private ScheduledExecutorService executor;
private PersistedTagsReaderWriter fileReaderWriter;
private volatile boolean isShuttingDown = false;
+ private boolean hasTagValueOverflow = false;
+
/**
- * Map of measure names and their related set of tag keys, which should be blocked.
+ * Map of measure names and their related set of tag keys, which are currently blocked.
*/
private final Map> blockedTagKeysByMeasure = Maps.newHashMap();
@@ -80,12 +80,9 @@ public class MeasureTagValueGuard {
@PostConstruct
protected void init() {
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();
- if (!tagGuardSettings.isEnabled()) {
- return;
- }
+ if (!tagGuardSettings.isEnabled()) return;
fileReaderWriter = new PersistedTagsReaderWriter(tagGuardSettings.getDatabaseFile(), new ObjectMapper());
- blockTagValuesTask.run();
scheduleTagGuardJob();
log.info(String.format("TagValueGuard started with scheduleDelay %s and database file %s", tagGuardSettings.getScheduleDelay(), tagGuardSettings.getDatabaseFile()));
@@ -93,14 +90,12 @@ protected void init() {
private void scheduleTagGuardJob() {
Duration tagGuardScheduleDelay = env.getCurrentConfig().getMetrics().getTagGuard().getScheduleDelay();
- blockTagValuesFuture = executorService.schedule(blockTagValuesTask, tagGuardScheduleDelay.toNanos(), TimeUnit.NANOSECONDS);
+ blockTagValuesFuture = executor.schedule(blockTagValuesTask, tagGuardScheduleDelay.toNanos(), TimeUnit.NANOSECONDS);
}
@PreDestroy
protected void stop() {
- if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) {
- return;
- }
+ if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;
isShuttingDown = true;
blockTagValuesFuture.cancel(true);
@@ -112,20 +107,18 @@ protected void stop() {
* If new tags values have been created, they will be persisted.
*/
Runnable blockTagValuesTask = () -> {
+ if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;
- if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) {
- return;
- }
+ // read current tag value database
+ Map>> availableTagsByMeasure = fileReaderWriter.read();
Set copy = latestTags;
latestTags = Collections.synchronizedSet(new HashSet<>());
- Map>> availableTagsByMeasure = fileReaderWriter.read();
-
+ // process new tags
copy.forEach(tagsHolder -> {
String measureName = tagsHolder.getMeasureName();
Map newTags = tagsHolder.getTags();
-
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());
Map> tagValuesByTagKey = availableTagsByMeasure.computeIfAbsent(measureName, k -> Maps.newHashMap());
@@ -133,13 +126,10 @@ protected void stop() {
Set tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>());
// if tag value is new AND max values per tag is already reached
if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) {
- blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet())
- .add(tagKey);
+ blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()).add(tagKey);
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(), String.format(tagOverFlowMessageTemplate, tagKey));
+ hasTagValueOverflow = true;
} else {
- Set blockedTagKeys = blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet());
- if(blockedTagKeys.remove(tagKey))
- agentHealthManager.invalidateIncident(this.getClass(), String.format(tagOverFlowResolvedMessageTemplate, tagKey));
tagValues.add(tagValue);
}
});
@@ -148,26 +138,50 @@ protected void stop() {
fileReaderWriter.write(availableTagsByMeasure);
- if (!isShuttingDown) {
- scheduleTagGuardJob();
+ // remove all blocked tags, if no values are stored in the database file
+ if(availableTagsByMeasure.isEmpty()) blockedTagKeysByMeasure.clear();
+
+ // independent of processing new tags, check if tags should be blocked or unblocked due to their tag value limit
+ availableTagsByMeasure.forEach((measureName, tags) -> {
+ int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());
+ tags.forEach((tagKey, tagValues) -> {
+ if(tagValues.size() >= maxValuesPerTag) {
+ boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet())
+ .add(tagKey);
+ if(isNewBlockedTag) {
+ agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(), String.format(tagOverFlowMessageTemplate, tagKey));
+ hasTagValueOverflow = true;
+ }
+ } else {
+ blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()).remove(tagKey);
+ }
+ });
+ });
+
+ // invalidate incident, if tag overflow was detected, but no more tags are blocked
+ boolean noBlockedTagKeys = blockedTagKeysByMeasure.values().stream().allMatch(Set::isEmpty);
+ if(hasTagValueOverflow && noBlockedTagKeys) {
+ agentHealthManager.invalidateIncident(this.getClass(), "Overflow for tags resolved");
+ hasTagValueOverflow = false;
}
+ if (!isShuttingDown) scheduleTagGuardJob();
};
/**
- * Gets the max value per tag for the given measure by hierarchically extracting {@link rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings#maxValuesPerTag} (prio 1), {@link TagGuardSettings#maxValuesPerTagByMeasure} (prio 2) and {@link TagGuardSettings#maxValuesPerTag} (default).
+ * Gets the max value amount per tag for the given measure by hierarchically extracting
+ * {@link MetricDefinitionSettings#maxValuesPerTag} (prio 1),
+ * {@link TagGuardSettings#maxValuesPerTagByMeasure} (prio 2) and
+ * {@link TagGuardSettings#maxValuesPerTag} (default).
*
- * @param measureName
- *
- * @return
+ * @param measureName the current measure
+ * @return The maximum amount of tag values for the given measure
*/
@VisibleForTesting
int getMaxValuesPerTag(String measureName, InspectitConfig config) {
int maxValuesPerTag = config.getMetrics().getDefinitions().get(measureName).getMaxValuesPerTag();
- if (maxValuesPerTag > 0) {
- return maxValuesPerTag;
- }
+ if (maxValuesPerTag > 0) return maxValuesPerTag;
Map maxValuesPerTagPerMeasuresMap = config.getMetrics()
.getTagGuard()
@@ -178,18 +192,21 @@ int getMaxValuesPerTag(String measureName, InspectitConfig config) {
}
/**
- *
- * @param context
- * @param metricAccessor
- * @return
+ * Creates the full tag context, including all specified tags, for the current measure
+ * @param context current context
+ * @param metricAccessor accessor for the measure as well as the particular tags
+ * @return TagContext including all tags for the current measure
*/
public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAccessor metricAccessor) {
Map tags = Maps.newHashMap();
String measureName = metricAccessor.getName();
InspectitContextImpl inspectitContext = context.getInspectitContext();
- Set blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet());
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();
+ Set blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet());
+ log.debug("Currently blocked tag keys for measure {}, due to exceeding the configured tag value limit: {}",
+ measureName, blockedTagKeys);
+
// first common tags to allow to overwrite by constant or data tags
commonTagsManager.getCommonTagKeys().forEach(commonTagKey -> {
Optional.ofNullable(inspectitContext.getData(commonTagKey.getName()))
@@ -220,7 +237,7 @@ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAcce
TagContextBuilder tagContextBuilder = Tags.getTagger().emptyBuilder();
tags.forEach((key, value) -> tagContextBuilder.putLocal(TagKey.create(key), TagUtils.createTagValue(key, value)));
- // Store the new tags for this measure as simple object and delay traversing trough tagKeys to async job
+ // store the new tags for this measure as simple object and delay traversing trough tagKeys to async job
latestTags.add(new TagsHolder(measureName, tags));
return tagContextBuilder.build();
@@ -256,10 +273,10 @@ public Map>> read() {
});
return tags;
} catch (Exception e) {
- log.error("Error loading tag value database from persistence file '{}'", fileName, e);
+ log.error("Error loading tag-guard database from persistence file '{}'", fileName, e);
}
} else {
- log.info("No tag value database available. Assuming first Agent deployment.");
+ log.info("Could not find tag-guard database file. File will be created during next write");
}
}
return Maps.newHashMap();
@@ -273,7 +290,7 @@ public void write(Map>> tagValues) {
String tagValuesString = mapper.writeValueAsString(tagValues);
Files.write(path, tagValuesString.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
- log.error("Error writing tag value database to file '{}'", fileName, e);
+ log.error("Error writing tag-guard database to file '{}'", fileName, e);
}
}
}
diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java
index 69d8ba659b..0302ccae19 100644
--- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java
+++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java
@@ -1,12 +1,15 @@
package rocks.inspectit.ocelot.core.selfmonitoring.event.listener;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident;
import rocks.inspectit.ocelot.commons.models.health.AgentHealthState;
import rocks.inspectit.ocelot.core.config.propertysources.http.HttpConfigurationPoller;
import rocks.inspectit.ocelot.core.selfmonitoring.AgentHealthManager;
import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthChangedEvent;
+import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthIncidentAddedEvent;
import java.util.List;
@@ -22,7 +25,18 @@ public class PollerWritingHealthEventListener implements HealthEventListener {
@Override
public void onAgentHealthEvent(AgentHealthChangedEvent event) {
List incidentHistory = agentHealthManager.getHistory();
+
AgentHealthState state = new AgentHealthState(event.getNewHealth(), event.getSource().toString(), event.getMessage(), incidentHistory);
httpConfigurationPoller.updateAgentHealthState(state);
}
+
+ @Async
+ @EventListener
+ public void onAgentHealthIncidentEvent(AgentHealthIncidentAddedEvent event) {
+ List incidentHistory = event.getCurrentIncidents();
+ AgentHealthIncident latestIncident = incidentHistory.get(0);
+
+ AgentHealthState state = new AgentHealthState(latestIncident.getHealth(), latestIncident.getSource(), latestIncident.getMessage(), incidentHistory);
+ httpConfigurationPoller.updateAgentHealthState(state);
+ }
}
diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/models/AgentHealthIncidentAddedEvent.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/models/AgentHealthIncidentAddedEvent.java
new file mode 100644
index 0000000000..92769f5a8c
--- /dev/null
+++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/models/AgentHealthIncidentAddedEvent.java
@@ -0,0 +1,22 @@
+package rocks.inspectit.ocelot.core.selfmonitoring.event.models;
+
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident;
+import rocks.inspectit.ocelot.core.utils.AgentHealthIncidentBuffer;
+
+import java.util.List;
+
+/**
+ * Fired by {@link AgentHealthIncidentBuffer} whenever a new incident has been added.
+ */
+public class AgentHealthIncidentAddedEvent extends ApplicationEvent {
+
+ @Getter
+ private final List currentIncidents;
+
+ public AgentHealthIncidentAddedEvent(Object source, List currentIncidents) {
+ super(source);
+ this.currentIncidents = currentIncidents;
+ }
+}
diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java
index ad095d297a..5a7fb3dc48 100644
--- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java
+++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java
@@ -1,16 +1,14 @@
package rocks.inspectit.ocelot.core.utils;
import lombok.RequiredArgsConstructor;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident;
import rocks.inspectit.ocelot.core.config.InspectitEnvironment;
+import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthIncidentAddedEvent;
import java.util.*;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
/**
* Buffer with queued AgentHealthIncidents.
@@ -20,10 +18,13 @@
@Component
@RequiredArgsConstructor
public class AgentHealthIncidentBuffer {
- private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>();
+
+ private final ApplicationContext ctx;
private final InspectitEnvironment env;
+ private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>();
+
/**
* Add new incident to the buffer.
* If the buffer is full, remove the latest incident at first.
@@ -33,7 +34,9 @@ public class AgentHealthIncidentBuffer {
public void put(AgentHealthIncident incident) {
int bufferSize = env.getCurrentConfig().getSelfMonitoring().getAgentHealth().getIncidentBufferSize();
while(buffer.size() >= bufferSize) buffer.poll();
+
buffer.offer(incident);
+ ctx.publishEvent(new AgentHealthIncidentAddedEvent(this, asList()));
}
/**