Skip to content

Commit

Permalink
refactor TagValueGuard and AgentHealth tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
EddeCCC committed Nov 23, 2023
1 parent bf49b65 commit ddd0dfd
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ class Menubar extends React.Component {
}
right={
<div className="p-toolbar-group-right flex-v-center">
<label className="documentation-icon">
<i className="pi pi-info-circle" onClick={this.openDocumentation} title="Open Documentation"></i>
</label>
<label className="documentation-icon">
<i className="pi pi-info-circle" onClick={this.openDocumentation} title="Open Documentation"></i>
</label>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AgentHealthIncident implements Comparable{
public class AgentHealthIncident implements Comparable<Object> {

private String time;
private AgentHealth health;
private String source;
private String message;
private boolean changedHealth;

public static AgentHealthIncident getNoneIncident() {
return new AgentHealthIncident(LocalDateTime.now().toString(), AgentHealth.OK, "", "", false);
}

@Override
public int compareTo(Object o) {
if(!(o instanceof AgentHealthIncident)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class MetricsSettings {
*/
private Duration frequency;

/**
* Settings for controlling the amount of unique tag values
*/
private TagGuardSettings tagGuard;

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
@AdditionalValidations
public class TagGuardSettings {

private boolean enabled;

/**
* The schedule delay of the {@code TagValueGuard}
* The schedule delay for the blocking task of the {@code MeasureTagValueGuard}
*/
private Duration scheduleDelay;

/**
*
* File, which contains metrics with their particular recorded tags and their tag values
*/
private String databaseFile;


/**
* String, which should be used as tag value, if the defined limit of tag values is exceeded
*/
private String overflowReplacement;

private boolean enabled;

/**
* Default max values per tag for all measures that are not specified in {@link #maxValuesPerTagByMeasure} or {@link rocks.inspectit.ocelot.config.model.metrics.definition.MetricDefinitionSettings#maxValuesPerTag}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,29 @@ public class AgentHealthSettings {
@NonNull
private Duration validityPeriod;

/**
* The amount of AgentHealthIncidents, which should be buffered.
*/
@NonNull
private Integer incidentBufferSize;

/**
* The minimum delay how often the AgentHealthManager checks for invalid agent health events to clear health status.
*/
@NonNull
private Duration minHealthCheckDelay;

@AssertTrue(message = "minHealthCheckDelay must be at least 60 seconds")
@AssertTrue(message = "validityPeriod must be greater than minHealthCheckDelay!")
public boolean validityPeriodIsGreaterThanMinDelay() {
return validityPeriod.compareTo(minHealthCheckDelay) > 0;
}

@AssertTrue(message = "minHealthCheckDelay must be at least 60 seconds!")
public boolean isMin60SecondsDelay() {
return minHealthCheckDelay.toMinutes() >= 1;
}

@AssertFalse(message = "The specified period should not be negative!")
@AssertFalse(message = "The specified period must not be negative!")
public boolean isNegativeDuration() {
return validityPeriod != null && validityPeriod.isNegative();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ inspectit:
# health changes due to instrumentation errors are valid until the next re-instrumentation
validity-period: 1h

# The amount of agent health incidents, which should be buffered
incident-buffer-size: 10

# The minimum delay how often the AgentHealthManager checks for invalid agent health events to clear health status
# By default the delay is calculated based on the last agent health event
# Minimum value is 1m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.TimeUnit;

/**
* Service for continuously triggering the updated of a agent configuration via HTTP.
* Service for continuously triggering the updating of an agent configuration via HTTP.
*/
@Service
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagContextBuilder;
import io.opencensus.tags.TagKey;
Expand Down Expand Up @@ -46,6 +47,8 @@ public class MeasureTagValueGuard {

private static final String tagOverFlowMessageTemplate = "Overflow for tag %s";

private static final String tagOverFlowResolvedMessageTemplate = "Overflow resolved for tag %s";

@Autowired
private InspectitEnvironment env;

Expand All @@ -65,7 +68,10 @@ public class MeasureTagValueGuard {

private volatile boolean isShuttingDown = false;

private final Map<String, Map<String, Boolean>> blockedTagKeysByMeasure = Maps.newHashMap();
/**
* Map of measure names and their related set of tag keys, which should be blocked.
*/
private final Map<String, Set<String>> blockedTagKeysByMeasure = Maps.newHashMap();

private Set<TagsHolder> latestTags = Collections.synchronizedSet(new HashSet<>());

Expand Down Expand Up @@ -100,6 +106,11 @@ protected void stop() {
blockTagValuesFuture.cancel(true);
}

/**
* Task, which reads the persisted tag values to determine, which tags should be blocked, because of exceeding
* the specific tag value limit.
* If new tags values have been created, they will be persisted.
*/
Runnable blockTagValuesTask = () -> {

if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) {
Expand All @@ -111,20 +122,24 @@ protected void stop() {

Map<String, Map<String, Set<String>>> availableTagsByMeasure = fileReaderWriter.read();

copy.forEach(t -> {
String measureName = t.getMeasureName();
Map<String, String> newTags = t.getTags();
copy.forEach(tagsHolder -> {
String measureName = tagsHolder.getMeasureName();
Map<String, String> newTags = tagsHolder.getTags();

int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());

Map<String, Set<String>> tagValuesByTagKey = availableTagsByMeasure.computeIfAbsent(measureName, k -> Maps.newHashMap());
newTags.forEach((tagKey, tagValue) -> {
Set<String> tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>());
if (tagValues.size() > maxValuesPerTag) {
blockedTagKeysByMeasure.computeIfAbsent(tagKey, blocked -> Maps.newHashMap())
.putIfAbsent(tagKey, true);
// if tag value is new AND max values per tag is already reached
if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) {
blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet())
.add(tagKey);
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(), String.format(tagOverFlowMessageTemplate, tagKey));

Check warning on line 138 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L136-L138

Added lines #L136 - L138 were not covered by tests
} else {
Set<String> blockedTagKeys = blockedTagKeysByMeasure.computeIfAbsent(measureName, blocked -> Sets.newHashSet());
if(blockedTagKeys.remove(tagKey))
agentHealthManager.invalidateIncident(this.getClass(), String.format(tagOverFlowResolvedMessageTemplate, tagKey));

Check warning on line 142 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L142

Added line #L142 was not covered by tests
tagValues.add(tagValue);
}
});
Expand Down Expand Up @@ -162,11 +177,17 @@ int getMaxValuesPerTag(String measureName, InspectitConfig config) {
.getMaxValuesPerTag());
}

/**
*
* @param context
* @param metricAccessor
* @return
*/
public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAccessor metricAccessor) {
Map<String, String> tags = Maps.newHashMap();
String measureName = metricAccessor.getName();
InspectitContextImpl inspectitContext = context.getInspectitContext();
Map<String, Boolean> blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Maps.newHashMap());
Set<String> blockedTagKeys = blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet());
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();

// first common tags to allow to overwrite by constant or data tags
Expand All @@ -177,7 +198,7 @@ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAcce

// then constant tags to allow to overwrite by data
metricAccessor.getConstantTags().forEach((key, value) -> {
if (tagGuardSettings.isEnabled() && blockedTagKeys.containsKey(key)) {
if (tagGuardSettings.isEnabled() && blockedTagKeys.contains(key)) {
String overflowReplacement = env.getCurrentConfig().getMetrics().getTagGuard().getOverflowReplacement();
tags.put(key, TagUtils.createTagValueAsString(key, overflowReplacement));
} else {

Check warning on line 204 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L202-L204

Added lines #L202 - L204 were not covered by tests
Expand All @@ -187,7 +208,7 @@ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAcce

// go over data tags and match the value to the key from the contextTags (if available)
metricAccessor.getDataTagAccessors().forEach((key, accessor) -> {
if (tagGuardSettings.isEnabled() && blockedTagKeys.containsKey(key)) {
if (tagGuardSettings.isEnabled() && blockedTagKeys.contains(key)) {
String overflowReplacement = env.getCurrentConfig().getMetrics().getTagGuard().getOverflowReplacement();
tags.put(key, TagUtils.createTagValueAsString(key, overflowReplacement));
} else {

Check warning on line 214 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/metrics/MeasureTagValueGuard.java#L212-L214

Added lines #L212 - L214 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package rocks.inspectit.ocelot.core.selfmonitoring;

import com.google.common.annotations.VisibleForTesting;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
Expand All @@ -8,14 +9,17 @@
import rocks.inspectit.ocelot.commons.models.health.AgentHealthIncident;
import rocks.inspectit.ocelot.core.config.InspectitEnvironment;
import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthChangedEvent;
import rocks.inspectit.ocelot.core.utils.RingBuffer;
import rocks.inspectit.ocelot.core.utils.AgentHealthIncidentBuffer;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Manages the {@link AgentHealth} and publishes {@link AgentHealthChangedEvent}s when it changes.
Expand All @@ -27,9 +31,11 @@ public class AgentHealthManager {

private final ApplicationContext ctx;

private final ScheduledExecutorService executor;

/**
* Map of {@code eventClass -> agentHealth}, whereas the {@code agentHealth} is reset whenever an event of type
* {@code eventClass} occurs (see {@link #onInvalidationEvent(Object)}.
* {@code eventClass} occurs (see {@link #onInvalidationEvent(Object)}).
* The resulting agent health is the most severe value in the map.
*/
private final Map<Class<?>, AgentHealth> invalidatableHealth = new ConcurrentHashMap<>();
Expand All @@ -44,24 +50,28 @@ public class AgentHealthManager {

private final InspectitEnvironment env;

private final RingBuffer<AgentHealthIncident> healthIncidentBuffer = new RingBuffer<>(10); //TODO make this configurable
private final AgentHealthIncidentBuffer healthIncidentBuffer;

public List<AgentHealthIncident> getHistory() {
return this.healthIncidentBuffer.asList();
@PostConstruct
@VisibleForTesting
void startHealthCheckScheduler() {
checkHealthAndSchedule();
}

public void notifyAgentHealth(AgentHealth eventHealth, Class<?> invalidator, String loggerName, String message) { //TODO test to check if the healthstate changes.
public List<AgentHealthIncident> getHistory() {
return healthIncidentBuffer.asList();
}

if (invalidator == null) {
public void notifyAgentHealth(AgentHealth eventHealth, Class<?> invalidator, String loggerName, String message) {
if (invalidator == null)
handleTimeoutHealth(eventHealth, loggerName, message);
} else {
else
handleInvalidatableHealth(eventHealth, invalidator, message);
}
}

public void handleInvalidatableHealth(AgentHealth eventHealth, Class<?> invalidator, String eventMessage) {
invalidatableHealth.merge(invalidator, eventHealth, AgentHealth::mostSevere);
triggerEvent(invalidator.getTypeName(), eventMessage);
triggerAgentHealthChangedEvent(invalidator.getTypeName(), eventMessage);
}

private void handleTimeoutHealth(AgentHealth eventHealth, String loggerName, String eventMassage) {
Expand All @@ -70,28 +80,60 @@ private void handleTimeoutHealth(AgentHealth eventHealth, String loggerName, Str
if (eventHealth.isMoreSevereOrEqualTo(AgentHealth.WARNING)) {
generalHealthTimeouts.put(eventHealth, LocalDateTime.now().plus(validityPeriod));
}
triggerEvent(loggerName, eventMassage + " This status is valid for " + validityPeriod);
triggerAgentHealthChangedEvent(loggerName, eventMassage + ". This status is valid for " + validityPeriod);
}

public void invalidateIncident(Class eventClass, String eventMessage) {
public void invalidateIncident(Class<?> eventClass, String eventMessage) {
invalidatableHealth.remove(eventClass);
triggerEvent(eventClass.getTypeName(), eventMessage);
triggerAgentHealthChangedEvent(eventClass.getTypeName(), eventMessage);
}

/**
* Checks whether the current health has changed since last check and schedules another check.
* The next check will run dependent on the earliest status timeout in the future:
* <ul>
* <li>does not exist -> run again after validity period</li>
* <li>exists -> run until that timeout is over</li>
* </ul>
*/
private void checkHealthAndSchedule() {
triggerAgentHealthChangedEvent(AgentHealthManager.class.getCanonicalName(), "Checking timed out agent healths");

Duration validityPeriod = env.getCurrentConfig().getSelfMonitoring().getAgentHealth().getValidityPeriod();
Duration minDelay = env.getCurrentConfig().getSelfMonitoring().getAgentHealth().getMinHealthCheckDelay();
Duration delay = generalHealthTimeouts.values()
.stream()
.filter(dateTime -> dateTime.isAfter(LocalDateTime.now()))
.max(Comparator.naturalOrder())
.map(dateTime -> {
Duration dif = Duration.between(dateTime, LocalDateTime.now());

Check warning on line 109 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/AgentHealthManager.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/AgentHealthManager.java#L109

Added line #L109 was not covered by tests
if (minDelay.compareTo(dif) > 0) return minDelay;
else return dif;

Check warning on line 111 in inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/AgentHealthManager.java

View check run for this annotation

Codecov / codecov/patch

inspectit-ocelot-core/src/main/java/rocks/inspectit/ocelot/core/selfmonitoring/AgentHealthManager.java#L111

Added line #L111 was not covered by tests
})
.orElse(validityPeriod);

executor.schedule(this::checkHealthAndSchedule, delay.toMillis(), TimeUnit.MILLISECONDS);
}

private void triggerEvent( String incidentSource, String message) {
/**
* Creates a new AgentHealthIncident and also triggers an AgentHealthChangedEvent, if the agent health has changed
* @param incidentSource class, which caused the incident
* @param message message, describing the incident
*/
private void triggerAgentHealthChangedEvent(String incidentSource, String message) {
synchronized (this) {
boolean changedHealth = false;
AgentHealth currHealth = getCurrentHealth();
AgentHealth lastHealth = lastNotifiedHealth;
if(healthChanged()) {
changedHealth = true;
lastNotifiedHealth = currHealth;
AgentHealthChangedEvent event = new AgentHealthChangedEvent(this, lastHealth, currHealth, message);
ctx.publishEvent(event);
}
boolean changedHealth = healthHasChanged();
AgentHealth currentHealth = getCurrentHealth();

AgentHealthIncident incident = new AgentHealthIncident(LocalDateTime.now().toString(), currHealth, incidentSource, message, changedHealth);
AgentHealthIncident incident = new AgentHealthIncident(LocalDateTime.now().toString(), currentHealth, incidentSource, message, changedHealth);
healthIncidentBuffer.put(incident);

if(changedHealth) {
AgentHealth lastHealth = lastNotifiedHealth;
lastNotifiedHealth = currentHealth;
AgentHealthChangedEvent event = new AgentHealthChangedEvent(this, lastHealth, currentHealth, message);
ctx.publishEvent(event);
}
}
}

Expand All @@ -100,7 +142,7 @@ private void triggerEvent( String incidentSource, String message) {
*
* @return true if the health state changed
*/
private boolean healthChanged() {
private boolean healthHasChanged() {
return getCurrentHealth() != lastNotifiedHealth;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import rocks.inspectit.ocelot.core.instrumentation.config.event.InstrumentationConfigurationChangedEvent;
import rocks.inspectit.ocelot.core.selfmonitoring.event.models.AgentHealthChangedEvent;

public interface HealthEventListener {
Expand Down
Loading

0 comments on commit ddd0dfd

Please sign in to comment.