Skip to content

Commit

Permalink
improve blocking of tags
Browse files Browse the repository at this point in the history
  • Loading branch information
EddeCCC committed Nov 24, 2023
1 parent ddd0dfd commit 0afb111
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class TagGuardSettings {
private Duration scheduleDelay;

/**
* File, which contains metrics with their particular recorded tags and their tag values
* File, which contains measures with their particular recorded tags and their tag values
*/
private String databaseFile;

Expand All @@ -39,9 +39,10 @@ public class TagGuardSettings {

/**
* Map containing max values per tag by Measure, e.g., {{'method_duration': 1337}}
* <br> max-values-per-tag-by-measure:
* - method_duration: 1337 <br>
* - http_in_responestime: 2000
* <br>
* max-values-per-tag-by-measure: <br>
* method_duration: 1337 <br>
* http_in_responestime: 2000
*/
@NotNull
private Map<String, Integer> maxValuesPerTagByMeasure = Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rocks.inspectit.ocelot.commons.models.health.AgentHealth;
import rocks.inspectit.ocelot.config.model.InspectitConfig;
import rocks.inspectit.ocelot.config.model.metrics.TagGuardSettings;
import rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings;
import rocks.inspectit.ocelot.core.config.InspectitEnvironment;
import rocks.inspectit.ocelot.core.instrumentation.context.InspectitContextImpl;
import rocks.inspectit.ocelot.core.instrumentation.hook.actions.IHookAction;
Expand All @@ -44,11 +45,8 @@
@Component
@Slf4j
public class MeasureTagValueGuard {

private static final String tagOverFlowMessageTemplate = "Overflow for tag %s";

private static final String tagOverFlowResolvedMessageTemplate = "Overflow resolved for tag %s";

@Autowired
private InspectitEnvironment env;

Expand All @@ -62,14 +60,16 @@ public class MeasureTagValueGuard {
private CommonTagsManager commonTagsManager;

@Autowired
private ScheduledExecutorService executorService;
private ScheduledExecutorService executor;

private PersistedTagsReaderWriter fileReaderWriter;

private volatile boolean isShuttingDown = false;

private boolean hasTagValueOverflow = false;

/**
* Map of measure names and their related set of tag keys, which should be blocked.
* Map of measure names and their related set of tag keys, which are currently blocked.
*/
private final Map<String, Set<String>> blockedTagKeysByMeasure = Maps.newHashMap();

Expand All @@ -80,27 +80,22 @@ public class MeasureTagValueGuard {
@PostConstruct
protected void init() {
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();
if (!tagGuardSettings.isEnabled()) {
return;
}
if (!tagGuardSettings.isEnabled()) return;

fileReaderWriter = new PersistedTagsReaderWriter(tagGuardSettings.getDatabaseFile(), new ObjectMapper());
blockTagValuesTask.run();
scheduleTagGuardJob();

log.info(String.format("TagValueGuard started with scheduleDelay %s and database file %s", tagGuardSettings.getScheduleDelay(), tagGuardSettings.getDatabaseFile()));
}

private void scheduleTagGuardJob() {
Duration tagGuardScheduleDelay = env.getCurrentConfig().getMetrics().getTagGuard().getScheduleDelay();
blockTagValuesFuture = executorService.schedule(blockTagValuesTask, tagGuardScheduleDelay.toNanos(), TimeUnit.NANOSECONDS);
blockTagValuesFuture = executor.schedule(blockTagValuesTask, tagGuardScheduleDelay.toNanos(), TimeUnit.NANOSECONDS);
}

@PreDestroy
protected void stop() {
if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) {
return;
}
if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;

isShuttingDown = true;
blockTagValuesFuture.cancel(true);
Expand All @@ -112,34 +107,29 @@ protected void stop() {
* If new tags values have been created, they will be persisted.
*/
Runnable blockTagValuesTask = () -> {
if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;

if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) {
return;
}
// read current tag value database
Map<String, Map<String, Set<String>>> availableTagsByMeasure = fileReaderWriter.read();

Set<TagsHolder> copy = latestTags;
latestTags = Collections.synchronizedSet(new HashSet<>());

Map<String, Map<String, Set<String>>> availableTagsByMeasure = fileReaderWriter.read();

// process new tags
copy.forEach(tagsHolder -> {
String measureName = tagsHolder.getMeasureName();
Map<String, String> newTags = tagsHolder.getTags();

int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());

Map<String, Set<String>> tagValuesByTagKey = availableTagsByMeasure.computeIfAbsent(measureName, k -> Maps.newHashMap());
newTags.forEach((tagKey, tagValue) -> {
Set<String> tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>());
// if tag value is new AND max values per tag is already reached
if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) {
blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet())
.add(tagKey);
blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()).add(tagKey);
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(), String.format(tagOverFlowMessageTemplate, tagKey));
hasTagValueOverflow = true;

Check warning on line 131 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L129-L131

Added lines #L129 - L131 were not covered by tests
} else {
Set<String> blockedTagKeys = blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet());
if(blockedTagKeys.remove(tagKey))
agentHealthManager.invalidateIncident(this.getClass(), String.format(tagOverFlowResolvedMessageTemplate, tagKey));
tagValues.add(tagValue);
}
});
Expand All @@ -148,26 +138,50 @@ protected void stop() {

fileReaderWriter.write(availableTagsByMeasure);

if (!isShuttingDown) {
scheduleTagGuardJob();
// remove all blocked tags, if no values are stored in the database file
if(availableTagsByMeasure.isEmpty()) blockedTagKeysByMeasure.clear();

// independent of processing new tags, check if tags should be blocked or unblocked due to their tag value limit
availableTagsByMeasure.forEach((measureName, tags) -> {
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());
tags.forEach((tagKey, tagValues) -> {
if(tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet())
.add(tagKey);

Check warning on line 150 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L149-L150

Added lines #L149 - L150 were not covered by tests
if(isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(), String.format(tagOverFlowMessageTemplate, tagKey));
hasTagValueOverflow = true;

Check warning on line 153 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L152-L153

Added lines #L152 - L153 were not covered by tests
}
} else {

Check warning on line 155 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L155

Added line #L155 was not covered by tests
blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()).remove(tagKey);
}
});
});

// invalidate incident, if tag overflow was detected, but no more tags are blocked
boolean noBlockedTagKeys = blockedTagKeysByMeasure.values().stream().allMatch(Set::isEmpty);
if(hasTagValueOverflow && noBlockedTagKeys) {
agentHealthManager.invalidateIncident(this.getClass(), "Overflow for tags resolved");
hasTagValueOverflow = false;

Check warning on line 165 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L164-L165

Added lines #L164 - L165 were not covered by tests
}

if (!isShuttingDown) scheduleTagGuardJob();
};

/**
* Gets the max value per tag for the given measure by hierarchically extracting {@link rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings#maxValuesPerTag} (prio 1), {@link TagGuardSettings#maxValuesPerTagByMeasure} (prio 2) and {@link TagGuardSettings#maxValuesPerTag} (default).
* Gets the max value amount per tag for the given measure by hierarchically extracting
* {@link MetricDefinitionSettings#maxValuesPerTag} (prio 1),
* {@link TagGuardSettings#maxValuesPerTagByMeasure} (prio 2) and
* {@link TagGuardSettings#maxValuesPerTag} (default).
*
* @param measureName
*
* @return
* @param measureName the current measure
* @return The maximum amount of tag values for the given measure
*/
@VisibleForTesting
int getMaxValuesPerTag(String measureName, InspectitConfig config) {
int maxValuesPerTag = config.getMetrics().getDefinitions().get(measureName).getMaxValuesPerTag();

if (maxValuesPerTag > 0) {
return maxValuesPerTag;
}
if (maxValuesPerTag > 0) return maxValuesPerTag;

Map<String, Integer> maxValuesPerTagPerMeasuresMap = config.getMetrics()
.getTagGuard()
Expand All @@ -178,18 +192,21 @@ int getMaxValuesPerTag(String measureName, InspectitConfig config) {
}

/**
*
* @param context
* @param metricAccessor
* @return
* Creates the full tag context, including all specified tags, for the current measure
* @param context current context
* @param metricAccessor accessor for the measure as well as the particular tags
* @return TagContext including all tags for the current measure
*/
public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAccessor metricAccessor) {
Map<String, String> tags = Maps.newHashMap();
String measureName = metricAccessor.getName();
InspectitContextImpl inspectitContext = context.getInspectitContext();
Set<String> blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet());
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();

Set<String> blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet());
log.debug("Currently blocked tag keys for measure {}, due to exceeding the configured tag value limit: {}",
measureName, blockedTagKeys);

// first common tags to allow to overwrite by constant or data tags
commonTagsManager.getCommonTagKeys().forEach(commonTagKey -> {
Optional.ofNullable(inspectitContext.getData(commonTagKey.getName()))
Expand Down Expand Up @@ -220,7 +237,7 @@ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAcce
TagContextBuilder tagContextBuilder = Tags.getTagger().emptyBuilder();
tags.forEach((key, value) -> tagContextBuilder.putLocal(TagKey.create(key), TagUtils.createTagValue(key, value)));

// Store the new tags for this measure as simple object and delay traversing trough tagKeys to async job
// store the new tags for this measure as simple object and delay traversing trough tagKeys to async job
latestTags.add(new TagsHolder(measureName, tags));

return tagContextBuilder.build();
Expand Down Expand Up @@ -256,10 +273,10 @@ public Map<String, Map<String, Set<String>>> read() {
});
return tags;
} catch (Exception e) {
log.error("Error loading tag value database from persistence file '{}'", fileName, e);
log.error("Error loading tag-guard database from persistence file '{}'", fileName, e);
}

Check warning on line 277 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L275-L277

Added lines #L275 - L277 were not covered by tests
} else {
log.info("No tag value database available. Assuming first Agent deployment.");
log.info("Could not find tag-guard database file. File will be created during next write");
}
}
return Maps.newHashMap();
Expand All @@ -273,7 +290,7 @@ public void write(Map<String, Map<String, Set<String>>> tagValues) {
String tagValuesString = mapper.writeValueAsString(tagValues);
Files.write(path, tagValuesString.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
log.error("Error writing tag value database to file '{}'", fileName, e);
log.error("Error writing tag-guard database to file '{}'", fileName, e);

Check warning on line 293 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L292-L293

Added lines #L292 - L293 were not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package rocks.inspectit.ocelot.core.selfmonitoring.event.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident;
import rocks.inspectit.ocelot.commons.models.health.AgentHealthState;
import rocks.inspectit.ocelot.core.config.propertysources.http.HttpConfigurationPoller;
import rocks.inspectit.ocelot.core.selfmonitoring.AgentHealthManager;
import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthChangedEvent;
import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthIncidentAddedEvent;

import java.util.List;

Expand All @@ -22,7 +25,18 @@ public class PollerWritingHealthEventListener implements HealthEventListener {
@Override
public void onAgentHealthEvent(AgentHealthChangedEvent event) {
List<AgentHealthIncident> incidentHistory = agentHealthManager.getHistory();

AgentHealthState state = new AgentHealthState(event.getNewHealth(), event.getSource().toString(), event.getMessage(), incidentHistory);
httpConfigurationPoller.updateAgentHealthState(state);
}

@Async
@EventListener
public void onAgentHealthIncidentEvent(AgentHealthIncidentAddedEvent event) {
List<AgentHealthIncident> incidentHistory = event.getCurrentIncidents();
AgentHealthIncident latestIncident = incidentHistory.get(0);

AgentHealthState state = new AgentHealthState(latestIncident.getHealth(), latestIncident.getSource(), latestIncident.getMessage(), incidentHistory);
httpConfigurationPoller.updateAgentHealthState(state);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package rocks.inspectit.ocelot.core.selfmonitoring.event.models;

import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident;
import rocks.inspectit.ocelot.core.utils.AgentHealthIncidentBuffer;

import java.util.List;

/**
* Fired by {@link AgentHealthIncidentBuffer} whenever a new incident has been added.
*/
public class AgentHealthIncidentAddedEvent extends ApplicationEvent {

@Getter
private final List<AgentHealthIncident> currentIncidents;

public AgentHealthIncidentAddedEvent(Object source, List<AgentHealthIncident> currentIncidents) {
super(source);
this.currentIncidents = currentIncidents;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package rocks.inspectit.ocelot.core.utils;

import lombok.RequiredArgsConstructor;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident;
import rocks.inspectit.ocelot.core.config.InspectitEnvironment;
import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthIncidentAddedEvent;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* Buffer with queued AgentHealthIncidents.
Expand All @@ -20,10 +18,13 @@
@Component
@RequiredArgsConstructor
public class AgentHealthIncidentBuffer {
private final ConcurrentLinkedQueue<AgentHealthIncident> buffer = new ConcurrentLinkedQueue<>();

private final ApplicationContext ctx;

private final InspectitEnvironment env;

private final ConcurrentLinkedQueue<AgentHealthIncident> buffer = new ConcurrentLinkedQueue<>();

/**
* Add new incident to the buffer.
* If the buffer is full, remove the latest incident at first.
Expand All @@ -33,7 +34,9 @@ public class AgentHealthIncidentBuffer {
public void put(AgentHealthIncident incident) {
int bufferSize = env.getCurrentConfig().getSelfMonitoring().getAgentHealth().getIncidentBufferSize();
while(buffer.size() >= bufferSize) buffer.poll();

buffer.offer(incident);
ctx.publishEvent(new AgentHealthIncidentAddedEvent(this, asList()));
}

/**
Expand Down

0 comments on commit 0afb111

Please sign in to comment.