From 0afb11139c758f0d23684832803ac32ca7ad72d3 Mon Sep 17 00:00:00 2001 From: EddeCCC Date: Fri, 24 Nov 2023 11:29:24 +0100 Subject: [PATCH] improve blocking of tags --- .../model/metrics/TagGuardSettings.java | 9 +- .../core/metrics/MeasureTagValueGuard.java | 101 ++++++++++-------- .../PollerWritingHealthEventListener.java | 14 +++ .../models/AgentHealthIncidentAddedEvent.java | 22 ++++ .../core/utils/AgentHealthIncidentBuffer.java | 13 ++- 5 files changed, 108 insertions(+), 51 deletions(-) create mode 100644 inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/models/AgentHealthIncidentAddedEvent.java diff --git a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java index d382368815..cf1c05518a 100644 --- a/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java +++ b/inspectit-ocelot-config/src/main/java/rocks/inspectit/ocelot/config/model/metrics/TagGuardSettings.java @@ -23,7 +23,7 @@ public class TagGuardSettings { private Duration scheduleDelay; /** - * File, which contains metrics with their particular recorded tags and their tag values + * File, which contains measures with their particular recorded tags and their tag values */ private String databaseFile; @@ -39,9 +39,10 @@ public class TagGuardSettings { /** * Map containing max values per tag by Measure, e.g., {{'method_duration': 1337}} - *
max-values-per-tag-by-measure: - * - method_duration: 1337
- * - http_in_responestime: 2000 + *
+ * max-values-per-tag-by-measure:
+ * method_duration: 1337
+ * http_in_responestime: 2000 */ @NotNull private Map maxValuesPerTagByMeasure = Collections.emptyMap(); diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java index 3c717b2d0c..f3023e82ce 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java @@ -20,6 +20,7 @@ import rocks.inspectit.ocelot.commons.models.health.AgentHealth; import rocks.inspectit.ocelot.config.model.InspectitConfig; import rocks.inspectit.ocelot.config.model.metrics.TagGuardSettings; +import rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings; import rocks.inspectit.ocelot.core.config.InspectitEnvironment; import rocks.inspectit.ocelot.core.instrumentation.context.InspectitContextImpl; import rocks.inspectit.ocelot.core.instrumentation.hook.actions.IHookAction; @@ -44,11 +45,8 @@ @Component @Slf4j public class MeasureTagValueGuard { - private static final String tagOverFlowMessageTemplate = "Overflow for tag %s"; - private static final String tagOverFlowResolvedMessageTemplate = "Overflow resolved for tag %s"; - @Autowired private InspectitEnvironment env; @@ -62,14 +60,16 @@ public class MeasureTagValueGuard { private CommonTagsManager commonTagsManager; @Autowired - private ScheduledExecutorService executorService; + private ScheduledExecutorService executor; private PersistedTagsReaderWriter fileReaderWriter; private volatile boolean isShuttingDown = false; + private boolean hasTagValueOverflow = false; + /** - * Map of measure names and their related set of tag keys, which should be blocked. + * Map of measure names and their related set of tag keys, which are currently blocked. */ private final Map> blockedTagKeysByMeasure = Maps.newHashMap(); @@ -80,12 +80,9 @@ public class MeasureTagValueGuard { @PostConstruct protected void init() { TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard(); - if (!tagGuardSettings.isEnabled()) { - return; - } + if (!tagGuardSettings.isEnabled()) return; fileReaderWriter = new PersistedTagsReaderWriter(tagGuardSettings.getDatabaseFile(), new ObjectMapper()); - blockTagValuesTask.run(); scheduleTagGuardJob(); log.info(String.format("TagValueGuard started with scheduleDelay %s and database file %s", tagGuardSettings.getScheduleDelay(), tagGuardSettings.getDatabaseFile())); @@ -93,14 +90,12 @@ protected void init() { private void scheduleTagGuardJob() { Duration tagGuardScheduleDelay = env.getCurrentConfig().getMetrics().getTagGuard().getScheduleDelay(); - blockTagValuesFuture = executorService.schedule(blockTagValuesTask, tagGuardScheduleDelay.toNanos(), TimeUnit.NANOSECONDS); + blockTagValuesFuture = executor.schedule(blockTagValuesTask, tagGuardScheduleDelay.toNanos(), TimeUnit.NANOSECONDS); } @PreDestroy protected void stop() { - if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) { - return; - } + if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return; isShuttingDown = true; blockTagValuesFuture.cancel(true); @@ -112,20 +107,18 @@ protected void stop() { * If new tags values have been created, they will be persisted. */ Runnable blockTagValuesTask = () -> { + if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return; - if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) { - return; - } + // read current tag value database + Map>> availableTagsByMeasure = fileReaderWriter.read(); Set copy = latestTags; latestTags = Collections.synchronizedSet(new HashSet<>()); - Map>> availableTagsByMeasure = fileReaderWriter.read(); - + // process new tags copy.forEach(tagsHolder -> { String measureName = tagsHolder.getMeasureName(); Map newTags = tagsHolder.getTags(); - int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig()); Map> tagValuesByTagKey = availableTagsByMeasure.computeIfAbsent(measureName, k -> Maps.newHashMap()); @@ -133,13 +126,10 @@ protected void stop() { Set tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>()); // if tag value is new AND max values per tag is already reached if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) { - blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet()) - .add(tagKey); + blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()).add(tagKey); agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(), String.format(tagOverFlowMessageTemplate, tagKey)); + hasTagValueOverflow = true; } else { - Set blockedTagKeys = blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet()); - if(blockedTagKeys.remove(tagKey)) - agentHealthManager.invalidateIncident(this.getClass(), String.format(tagOverFlowResolvedMessageTemplate, tagKey)); tagValues.add(tagValue); } }); @@ -148,26 +138,50 @@ protected void stop() { fileReaderWriter.write(availableTagsByMeasure); - if (!isShuttingDown) { - scheduleTagGuardJob(); + // remove all blocked tags, if no values are stored in the database file + if(availableTagsByMeasure.isEmpty()) blockedTagKeysByMeasure.clear(); + + // independent of processing new tags, check if tags should be blocked or unblocked due to their tag value limit + availableTagsByMeasure.forEach((measureName, tags) -> { + int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig()); + tags.forEach((tagKey, tagValues) -> { + if(tagValues.size() >= maxValuesPerTag) { + boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()) + .add(tagKey); + if(isNewBlockedTag) { + agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(), String.format(tagOverFlowMessageTemplate, tagKey)); + hasTagValueOverflow = true; + } + } else { + blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()).remove(tagKey); + } + }); + }); + + // invalidate incident, if tag overflow was detected, but no more tags are blocked + boolean noBlockedTagKeys = blockedTagKeysByMeasure.values().stream().allMatch(Set::isEmpty); + if(hasTagValueOverflow && noBlockedTagKeys) { + agentHealthManager.invalidateIncident(this.getClass(), "Overflow for tags resolved"); + hasTagValueOverflow = false; } + if (!isShuttingDown) scheduleTagGuardJob(); }; /** - * Gets the max value per tag for the given measure by hierarchically extracting {@link rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings#maxValuesPerTag} (prio 1), {@link TagGuardSettings#maxValuesPerTagByMeasure} (prio 2) and {@link TagGuardSettings#maxValuesPerTag} (default). + * Gets the max value amount per tag for the given measure by hierarchically extracting + * {@link MetricDefinitionSettings#maxValuesPerTag} (prio 1), + * {@link TagGuardSettings#maxValuesPerTagByMeasure} (prio 2) and + * {@link TagGuardSettings#maxValuesPerTag} (default). * - * @param measureName - * - * @return + * @param measureName the current measure + * @return The maximum amount of tag values for the given measure */ @VisibleForTesting int getMaxValuesPerTag(String measureName, InspectitConfig config) { int maxValuesPerTag = config.getMetrics().getDefinitions().get(measureName).getMaxValuesPerTag(); - if (maxValuesPerTag > 0) { - return maxValuesPerTag; - } + if (maxValuesPerTag > 0) return maxValuesPerTag; Map maxValuesPerTagPerMeasuresMap = config.getMetrics() .getTagGuard() @@ -178,18 +192,21 @@ int getMaxValuesPerTag(String measureName, InspectitConfig config) { } /** - * - * @param context - * @param metricAccessor - * @return + * Creates the full tag context, including all specified tags, for the current measure + * @param context current context + * @param metricAccessor accessor for the measure as well as the particular tags + * @return TagContext including all tags for the current measure */ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAccessor metricAccessor) { Map tags = Maps.newHashMap(); String measureName = metricAccessor.getName(); InspectitContextImpl inspectitContext = context.getInspectitContext(); - Set blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()); TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard(); + Set blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()); + log.debug("Currently blocked tag keys for measure {}, due to exceeding the configured tag value limit: {}", + measureName, blockedTagKeys); + // first common tags to allow to overwrite by constant or data tags commonTagsManager.getCommonTagKeys().forEach(commonTagKey -> { Optional.ofNullable(inspectitContext.getData(commonTagKey.getName())) @@ -220,7 +237,7 @@ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAcce TagContextBuilder tagContextBuilder = Tags.getTagger().emptyBuilder(); tags.forEach((key, value) -> tagContextBuilder.putLocal(TagKey.create(key), TagUtils.createTagValue(key, value))); - // Store the new tags for this measure as simple object and delay traversing trough tagKeys to async job + // store the new tags for this measure as simple object and delay traversing trough tagKeys to async job latestTags.add(new TagsHolder(measureName, tags)); return tagContextBuilder.build(); @@ -256,10 +273,10 @@ public Map>> read() { }); return tags; } catch (Exception e) { - log.error("Error loading tag value database from persistence file '{}'", fileName, e); + log.error("Error loading tag-guard database from persistence file '{}'", fileName, e); } } else { - log.info("No tag value database available. Assuming first Agent deployment."); + log.info("Could not find tag-guard database file. File will be created during next write"); } } return Maps.newHashMap(); @@ -273,7 +290,7 @@ public void write(Map>> tagValues) { String tagValuesString = mapper.writeValueAsString(tagValues); Files.write(path, tagValuesString.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { - log.error("Error writing tag value database to file '{}'", fileName, e); + log.error("Error writing tag-guard database to file '{}'", fileName, e); } } } diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java index 69d8ba659b..0302ccae19 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/listener/PollerWritingHealthEventListener.java @@ -1,12 +1,15 @@ package rocks.inspectit.ocelot.core.selfmonitoring.event.listener; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident; import rocks.inspectit.ocelot.commons.models.health.AgentHealthState; import rocks.inspectit.ocelot.core.config.propertysources.http.HttpConfigurationPoller; import rocks.inspectit.ocelot.core.selfmonitoring.AgentHealthManager; import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthChangedEvent; +import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthIncidentAddedEvent; import java.util.List; @@ -22,7 +25,18 @@ public class PollerWritingHealthEventListener implements HealthEventListener { @Override public void onAgentHealthEvent(AgentHealthChangedEvent event) { List incidentHistory = agentHealthManager.getHistory(); + AgentHealthState state = new AgentHealthState(event.getNewHealth(), event.getSource().toString(), event.getMessage(), incidentHistory); httpConfigurationPoller.updateAgentHealthState(state); } + + @Async + @EventListener + public void onAgentHealthIncidentEvent(AgentHealthIncidentAddedEvent event) { + List incidentHistory = event.getCurrentIncidents(); + AgentHealthIncident latestIncident = incidentHistory.get(0); + + AgentHealthState state = new AgentHealthState(latestIncident.getHealth(), latestIncident.getSource(), latestIncident.getMessage(), incidentHistory); + httpConfigurationPoller.updateAgentHealthState(state); + } } diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/models/AgentHealthIncidentAddedEvent.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/models/AgentHealthIncidentAddedEvent.java new file mode 100644 index 0000000000..92769f5a8c --- /dev/null +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/event/models/AgentHealthIncidentAddedEvent.java @@ -0,0 +1,22 @@ +package rocks.inspectit.ocelot.core.selfmonitoring.event.models; + +import lombok.Getter; +import org.springframework.context.ApplicationEvent; +import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident; +import rocks.inspectit.ocelot.core.utils.AgentHealthIncidentBuffer; + +import java.util.List; + +/** + * Fired by {@link AgentHealthIncidentBuffer} whenever a new incident has been added. + */ +public class AgentHealthIncidentAddedEvent extends ApplicationEvent { + + @Getter + private final List currentIncidents; + + public AgentHealthIncidentAddedEvent(Object source, List currentIncidents) { + super(source); + this.currentIncidents = currentIncidents; + } +} diff --git a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java index ad095d297a..5a7fb3dc48 100644 --- a/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java +++ b/inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/utils/AgentHealthIncidentBuffer.java @@ -1,16 +1,14 @@ package rocks.inspectit.ocelot.core.utils; import lombok.RequiredArgsConstructor; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident; import rocks.inspectit.ocelot.core.config.InspectitEnvironment; +import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthIncidentAddedEvent; import java.util.*; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; /** * Buffer with queued AgentHealthIncidents. @@ -20,10 +18,13 @@ @Component @RequiredArgsConstructor public class AgentHealthIncidentBuffer { - private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + + private final ApplicationContext ctx; private final InspectitEnvironment env; + private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + /** * Add new incident to the buffer. * If the buffer is full, remove the latest incident at first. @@ -33,7 +34,9 @@ public class AgentHealthIncidentBuffer { public void put(AgentHealthIncident incident) { int bufferSize = env.getCurrentConfig().getSelfMonitoring().getAgentHealth().getIncidentBufferSize(); while(buffer.size() >= bufferSize) buffer.poll(); + buffer.offer(incident); + ctx.publishEvent(new AgentHealthIncidentAddedEvent(this, asList())); } /**