Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-sampled live metrics #2647

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,11 @@ public AgentLogExporter(
@Nullable QuickPulse quickPulse,
BatchItemProcessor batchItemProcessor) {
this.severityThreshold = severityThreshold;
this.logSamplingOverrides = new SamplingOverrides(logSamplingOverrides);
this.exceptionSamplingOverrides = new SamplingOverrides(exceptionSamplingOverrides);
this.logSamplingOverrides = new SamplingOverrides(logSamplingOverrides, quickPulse);
this.exceptionSamplingOverrides = new SamplingOverrides(exceptionSamplingOverrides, quickPulse);
this.mapper = mapper;
telemetryItemConsumer =
telemetryItem -> {
if (quickPulse != null) {
quickPulse.add(telemetryItem);
}
TelemetryObservers.INSTANCE
.getObservers()
.forEach(consumer -> consumer.accept(telemetryItem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.azure.monitor.opentelemetry.exporter.implementation.SpanDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.logging.OperationLogger;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulse;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
import com.microsoft.applicationinsights.agent.internal.telemetry.BatchItemProcessor;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
Expand All @@ -18,7 +17,6 @@
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,16 +30,10 @@ public final class AgentSpanExporter implements SpanExporter {
private final SpanDataMapper mapper;
private final Consumer<TelemetryItem> telemetryItemConsumer;

public AgentSpanExporter(
SpanDataMapper mapper,
@Nullable QuickPulse quickPulse,
BatchItemProcessor batchItemProcessor) {
public AgentSpanExporter(SpanDataMapper mapper, BatchItemProcessor batchItemProcessor) {
this.mapper = mapper;
telemetryItemConsumer =
telemetryItem -> {
if (quickPulse != null) {
quickPulse.add(telemetryItem);
}
TelemetryObservers.INSTANCE
.getObservers()
.forEach(consumer -> consumer.accept(telemetryItem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;

import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulse;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.ThreadPoolUtils;
import com.microsoft.applicationinsights.agent.internal.classicsdk.BytecodeUtilImpl;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration;
Expand All @@ -31,17 +32,19 @@ public class RpConfigurationPolling implements Runnable {
private final Configuration configuration;
private final TelemetryClient telemetryClient;
private final AppIdSupplier appIdSupplier;
private final QuickPulse quickPulse;

public static void startPolling(
RpConfiguration rpConfiguration,
Configuration configuration,
TelemetryClient telemetryClient,
AppIdSupplier appIdSupplier) {
AppIdSupplier appIdSupplier,
QuickPulse quickPulse) {
Executors.newSingleThreadScheduledExecutor(
ThreadPoolUtils.createDaemonThreadFactory(RpConfigurationPolling.class))
.scheduleWithFixedDelay(
new RpConfigurationPolling(
rpConfiguration, configuration, telemetryClient, appIdSupplier),
rpConfiguration, configuration, telemetryClient, appIdSupplier, quickPulse),
60,
60,
SECONDS);
Expand All @@ -52,11 +55,13 @@ public static void startPolling(
RpConfiguration rpConfiguration,
Configuration configuration,
TelemetryClient telemetryClient,
AppIdSupplier appIdSupplier) {
AppIdSupplier appIdSupplier,
QuickPulse quickPulse) {
this.rpConfiguration = rpConfiguration;
this.configuration = configuration;
this.telemetryClient = telemetryClient;
this.appIdSupplier = appIdSupplier;
this.quickPulse = quickPulse;
}

@Override
Expand Down Expand Up @@ -112,7 +117,8 @@ public void run() {
if (changed) {
configuration.sampling.percentage = newRpConfiguration.sampling.percentage;
configuration.sampling.requestsPerSecond = newRpConfiguration.sampling.requestsPerSecond;
DelegatingSampler.getInstance().setDelegate(Samplers.getSampler(configuration));
DelegatingSampler.getInstance()
.setDelegate(Samplers.getSampler(configuration, quickPulse));
if (configuration.sampling.percentage != null) {
BytecodeUtilImpl.samplingPercentage = configuration.sampling.percentage.floatValue();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.azure.monitor.opentelemetry.exporter.implementation.SpanDataMapper;
import com.azure.monitor.opentelemetry.exporter.implementation.configuration.ConnectionString;
import com.azure.monitor.opentelemetry.exporter.implementation.heartbeat.HeartbeatExporter;
import com.azure.monitor.opentelemetry.exporter.implementation.livemetrics.LiveMetricsLogProcessor;
import com.azure.monitor.opentelemetry.exporter.implementation.livemetrics.LiveMetricsSpanProcessor;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulse;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings;
Expand Down Expand Up @@ -180,20 +182,6 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
telemetryClient, SecondEntryPoint.agentLogExporter, appIdSupplier));
}

RpConfiguration rpConfiguration = FirstEntryPoint.getRpConfiguration();
if (rpConfiguration != null) {
RpConfigurationPolling.startPolling(
rpConfiguration, configuration, telemetryClient, appIdSupplier);
}

// initialize StatsbeatModule
statsbeatModule.start(telemetryClient, configuration);

AfterAgentListener.setAppIdSupplier(appIdSupplier);

// TODO (trask) add this method to AutoConfigurationCustomizer upstream?
((AutoConfiguredOpenTelemetrySdkBuilder) autoConfiguration).registerShutdownHook(false);

QuickPulse quickPulse;
if (configuration.preview.liveMetrics.enabled) {
quickPulse =
Expand All @@ -214,6 +202,20 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
}
telemetryClient.setQuickPulse(quickPulse);

RpConfiguration rpConfiguration = FirstEntryPoint.getRpConfiguration();
if (rpConfiguration != null) {
RpConfigurationPolling.startPolling(
rpConfiguration, configuration, telemetryClient, appIdSupplier, quickPulse);
}

// initialize StatsbeatModule
statsbeatModule.start(telemetryClient, configuration);

AfterAgentListener.setAppIdSupplier(appIdSupplier);

// TODO (trask) add this method to AutoConfigurationCustomizer upstream?
((AutoConfiguredOpenTelemetrySdkBuilder) autoConfiguration).registerShutdownHook(false);

autoConfiguration
.addPropertiesCustomizer(new AiConfigCustomizer())
.addSpanExporterCustomizer(
Expand Down Expand Up @@ -297,7 +299,7 @@ private static SdkTracerProviderBuilder configureTracing(
configuration.preview.additionalPropagators,
configuration.preview.legacyRequestIdPropagation.enabled);
}
DelegatingSampler.getInstance().setDelegate(Samplers.getSampler(configuration));
DelegatingSampler.getInstance().setDelegate(Samplers.getSampler(configuration, quickPulse));
} else {
// in Azure Functions, we configure later on, once we know user has opted in to tracing
// (note: the default for DelegatingPropagator is to not propagate anything
Expand Down Expand Up @@ -326,12 +328,20 @@ private static SdkTracerProviderBuilder configureTracing(
tracerProvider.addSpanProcessor(new AiLegacyHeaderSpanProcessor());
}

// live metrics span processor is used to add spans to quick pulse
SpanDataMapper mapper =
createSpanDataMapper(telemetryClient, configuration.preview.captureHttpServer4xxAsError);
if (quickPulse != null) {
tracerProvider.addSpanProcessor(new LiveMetricsSpanProcessor(mapper, quickPulse));
}

String tracesExporter = otelConfig.getString("otel.traces.exporter");
if ("none".equals(tracesExporter)) { // "none" is the default set in AiConfigCustomizer
SpanExporter spanExporter =
createSpanExporter(
telemetryClient, quickPulse, configuration.preview.captureHttpServer4xxAsError);

mapper,
telemetryClient.getGeneralBatchItemProcessor(),
telemetryClient.getStatsbeatModule());
spanExporter = wrapSpanExporter(spanExporter, configuration);

// using BatchSpanProcessor in order to get off of the application thread as soon as possible
Expand All @@ -346,37 +356,35 @@ private static SdkTracerProviderBuilder configureTracing(
return tracerProvider;
}

private static SpanExporter createSpanExporter(
TelemetryClient telemetryClient,
@Nullable QuickPulse quickPulse,
boolean captureHttpServer4xxAsError) {

SpanDataMapper mapper =
new SpanDataMapper(
captureHttpServer4xxAsError,
telemetryClient::populateDefaults,
(event, instrumentationName) -> {
boolean lettuce51 = instrumentationName.equals("io.opentelemetry.lettuce-5.1");
if (lettuce51 && event.getName().startsWith("redis.encode.")) {
// special case as these are noisy and come from the underlying library itself
return true;
}
boolean grpc16 = instrumentationName.equals("io.opentelemetry.grpc-1.6");
if (grpc16 && event.getName().equals("message")) {
// OpenTelemetry semantic conventions define semi-noisy grpc events
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md#events
//
// we want to suppress these (at least by default)
return true;
}
return false;
});

BatchItemProcessor batchItemProcessor = telemetryClient.getGeneralBatchItemProcessor();
private static SpanDataMapper createSpanDataMapper(
TelemetryClient telemetryClient, boolean captureHttpServer4xxAsError) {
return new SpanDataMapper(
captureHttpServer4xxAsError,
telemetryClient::populateDefaults,
(event, instrumentationName) -> {
boolean lettuce51 = instrumentationName.equals("io.opentelemetry.lettuce-5.1");
if (lettuce51 && event.getName().startsWith("redis.encode.")) {
// special case as these are noisy and come from the underlying library itself
return true;
}
boolean grpc16 = instrumentationName.equals("io.opentelemetry.grpc-1.6");
if (grpc16 && event.getName().equals("message")) {
// OpenTelemetry semantic conventions define semi-noisy grpc events
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md#events
//
// we want to suppress these (at least by default)
return true;
}
return false;
});
}

private static SpanExporter createSpanExporter(
SpanDataMapper mapper,
BatchItemProcessor batchItemProcessor,
StatsbeatModule statsbeatModule) {
return new StatsbeatSpanExporter(
new AgentSpanExporter(mapper, quickPulse, batchItemProcessor),
telemetryClient.getStatsbeatModule());
new AgentSpanExporter(mapper, batchItemProcessor), statsbeatModule);
}

private static SpanExporter wrapSpanExporter(
Expand Down Expand Up @@ -447,9 +455,19 @@ private static SdkLoggerProviderBuilder configureLogging(
// "ai.preview.service_name" being set programmatically on CONSUMER spans
builder.addLogRecordProcessor(new InheritedRoleNameLogProcessor());

// live metrics log processor is used to add logs to quick pulse when it's enabled
LogDataMapper mapper =
createLogDataMapper(
telemetryClient, configuration.preview.captureLoggingLevelAsCustomDimension);
if (quickPulse != null) {
builder.addLogRecordProcessor(new LiveMetricsLogProcessor(mapper, quickPulse));
}

String logsExporter = otelConfig.getString("otel.logs.exporter");
if ("none".equals(logsExporter)) { // "none" is the default set in AiConfigCustomizer
LogRecordExporter logExporter = createLogExporter(telemetryClient, quickPulse, configuration);
LogRecordExporter logExporter =
createLogExporter(
mapper, quickPulse, telemetryClient.getGeneralBatchItemProcessor(), configuration);

logExporter = wrapLogExporter(logExporter, configuration);

Expand All @@ -465,16 +483,17 @@ private static SdkLoggerProviderBuilder configureLogging(
return builder;
}

private static LogDataMapper createLogDataMapper(
TelemetryClient telemetryClient, boolean captureLoggingLevelAsCustomDimension) {
return new LogDataMapper(
captureLoggingLevelAsCustomDimension, telemetryClient::populateDefaults);
}

private static LogRecordExporter createLogExporter(
TelemetryClient telemetryClient,
LogDataMapper mapper,
@Nullable QuickPulse quickPulse,
BatchItemProcessor batchItemProcessor,
Configuration configuration) {

LogDataMapper mapper =
new LogDataMapper(
configuration.preview.captureLoggingLevelAsCustomDimension,
telemetryClient::populateDefaults);

List<Configuration.SamplingOverride> logSamplingOverrides =
configuration.preview.sampling.overrides.stream()
.filter(override -> override.telemetryType == SamplingTelemetryType.TRACE)
Expand All @@ -491,7 +510,7 @@ private static LogRecordExporter createLogExporter(
exceptionSamplingOverrides,
mapper,
quickPulse,
telemetryClient.getGeneralBatchItemProcessor());
batchItemProcessor);

return agentLogExporter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.microsoft.applicationinsights.agent.internal.sampling;

import com.azure.monitor.opentelemetry.exporter.implementation.RequestChecker;
import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulse;
import com.microsoft.applicationinsights.agent.internal.configuration.Configuration.SamplingOverride;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
Expand All @@ -24,9 +25,11 @@ class AiOverrideSampler implements Sampler {
AiOverrideSampler(
List<SamplingOverride> requestSamplingOverrides,
List<SamplingOverride> dependencySamplingOverrides,
Sampler delegate) {
this.requestSamplingOverrides = new SamplingOverrides(requestSamplingOverrides);
this.dependencySamplingOverrides = new SamplingOverrides(dependencySamplingOverrides);
Sampler delegate,
QuickPulse quickPulse) {
this.requestSamplingOverrides = new SamplingOverrides(requestSamplingOverrides, quickPulse);
this.dependencySamplingOverrides =
new SamplingOverrides(dependencySamplingOverrides, quickPulse);
this.delegate = delegate;
}

Expand Down
Loading