Skip to content

Commit

Permalink
feat(core): OTel Exponential Delta Histograms (#1879)
Browse files Browse the repository at this point in the history
Open Telemetry Exponential Histograms Part 1 includes:
* Bucket definitions, ser/deser of buckets, along with storage in histogram vectors
* Summing exponential histograms of different schemas
* Delta exponential histograms - rate, sum and histogram_quantile functions tested
* PromQL function histogram_fraction
* Update of histogram_quantile to calculate quantile exponentially
* Test downsampling of exponential histograms
* Dev ingestion of synthetic data
* JMH benchmark for exponential histograms

Cumulative histograms not supported yet.
Negative observations not supported yet.
  • Loading branch information
vishramachandran authored Nov 13, 2024
1 parent e20b7dd commit 6afeb38
Show file tree
Hide file tree
Showing 22 changed files with 1,036 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ object NodeClusterActor {
* @param source the IngestionSource on each node. Use noOpSource to not start ingestion and
* manually push records into NodeCoordinator.
* @param storeConfig a StoreConfig for the MemStore.
* @param overrideSchema if true, will override the schema in the MetaStore with the schema in the Dataset.
* @return DatasetVerified - meaning the dataset and columns are valid. Does not mean ingestion is
* setup on all nodes - for that, subscribe to ShardMapUpdate's
*/
final case class SetupDataset(dataset: Dataset,
resources: DatasetResourceSpec,
source: IngestionSource,
storeConfig: StoreConfig,
downsampleConfig: DownsampleConfig = DownsampleConfig.disabled) {
downsampleConfig: DownsampleConfig = DownsampleConfig.disabled,
overrideSchema: Boolean = true) {
import collection.JavaConverters._
val resourceConfig = ConfigFactory.parseMap(
Map("num-shards" -> resources.numShards, "min-num-nodes" -> resources.minNumNodes).asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
}

def ingestHandlers: Receive = LoggingReceive {
case SetupDataset(dataset, resources, source, storeConf, downsample) =>
case SetupDataset(dataset, resources, source, storeConf, downsample, overrideSchema) =>
// used only in unit tests
if (!(ingesters contains dataset.ref)) {
setupDataset(dataset, storeConf, resources.numShards, source, downsample, true)
setupDataset(dataset, storeConf, resources.numShards, source, downsample, overrideSchema)
}

case IngestRows(dataset, shard, rows) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@ object ProtoConverters {
case filodb.query.InstantFunctionId.HistogramQuantile => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_QUANTILE
case filodb.query.InstantFunctionId.HistogramMaxQuantile => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_MAX_QUANTILE
case filodb.query.InstantFunctionId.HistogramBucket => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_BUCKET
case filodb.query.InstantFunctionId.HistogramFraction => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_FRACTION
case filodb.query.InstantFunctionId.Ln => GrpcMultiPartitionQueryService.InstantFunctionId.LN
case filodb.query.InstantFunctionId.Log10 => GrpcMultiPartitionQueryService.InstantFunctionId.LOG10
case filodb.query.InstantFunctionId.Log2 => GrpcMultiPartitionQueryService.InstantFunctionId.LOG2
Expand Down Expand Up @@ -929,6 +930,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_QUANTILE => filodb.query.InstantFunctionId.HistogramQuantile
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_MAX_QUANTILE => filodb.query.InstantFunctionId.HistogramMaxQuantile
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_BUCKET => filodb.query.InstantFunctionId.HistogramBucket
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_FRACTION => filodb.query.InstantFunctionId.HistogramFraction
case GrpcMultiPartitionQueryService.InstantFunctionId.LN => filodb.query.InstantFunctionId.Ln
case GrpcMultiPartitionQueryService.InstantFunctionId.LOG10 => filodb.query.InstantFunctionId.Log10
case GrpcMultiPartitionQueryService.InstantFunctionId.LOG2 => filodb.query.InstantFunctionId.Log2
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/filodb.core/metadata/Schemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ final case class Schemas(part: PartitionSchema,
}

private def bytesPerSampleSwagString = bytesPerSampleSwag.map { case (k, v) =>
s"${schemaName(k._1)} ColId:${k._2} : $v"
s"${schemaName(k._1)} ${k._1} ColId:${k._2} : $v"
}

Schemas._log.info(s"bytesPerSampleSwag: $bytesPerSampleSwagString")
Expand Down
27 changes: 27 additions & 0 deletions core/src/test/scala/filodb.core/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ object MachineMetricsData {
Seq("timestamp:ts", "count:long", "sum:long", "h:hist:counter=false"),
options = DatasetOptions(shardKeyColumns = Seq("_ws_", "_ns_", "metric"), "metric"))

val expHistDataset = Dataset("histogram", Seq("metric:string", "tags:map"),
Seq("timestamp:ts", "count:long", "sum:long", "min:long", "max:long", "h:hist:counter=false"),
options = DatasetOptions(shardKeyColumns = Seq("_ws_", "_ns_", "metric"), "metric"))

var histBucketScheme: bv.HistogramBuckets = _
def linearHistSeries(startTs: Long = 100000L, numSeries: Int = 10, timeStep: Int = 1000, numBuckets: Int = 8,
infBucket: Boolean = false, ws: String = "demo"):
Expand Down Expand Up @@ -383,6 +387,29 @@ object MachineMetricsData {
}
}

def otelDeltaExponentialHistSeries(startTs: Long = 100000L, numSeries: Int = 10,
timeStep: Int = 10000, numBuckets: Int = 160, ws: String = "demo"):
Stream[Seq[Any]] = {
histBucketScheme = bv.Base2ExpHistogramBuckets(3, -20, numBuckets - 1)
val buckets = new Array[Long](numBuckets)
def updateBuckets(bucketNo: Int): Unit = {
for { b <- bucketNo until numBuckets } {
buckets(b) += 1
}
}
Stream.from(0).map { n =>
updateBuckets(n % numBuckets)
Seq(startTs + n * timeStep,
(1 + n).toLong,
buckets.sum,
buckets.min,
buckets.max,
bv.LongHistogram(histBucketScheme, buckets.map(x => x)),
"request-latency",
extraTags ++ Map("_ws_".utf8 -> ws.utf8, "_ns_".utf8 -> "testapp".utf8, "dc".utf8 -> s"${n % numSeries}".utf8))
}
}

// Data usable with prom-histogram schema
def linearPromHistSeries(startTs: Long = 100000L, numSeries: Int = 10, timeStep: Int = 1000, numBuckets: Int = 8):
Stream[Seq[Any]] = linearHistSeries(startTs, numSeries, timeStep, numBuckets).map { d =>
Expand Down
17 changes: 12 additions & 5 deletions gateway/src/main/scala/filodb/gateway/GatewayServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ object GatewayServer extends StrictLogging {
descrYes = "Generate otel-cumulative-histogram schema test data and exit")
val genOtelDeltaHistData = toggle(noshort = true,
descrYes = "Generate otel-delta-histogram schema test data and exit")
val genOtelExpDeltaHistData = toggle(noshort = true,
descrYes = "Generate otel-exponential-delta-histogram schema test data and exit")
val genGaugeData = toggle(noshort = true, descrYes = "Generate Prometheus gauge-schema test data and exit")
val genCounterData = toggle(noshort = true, descrYes = "Generate Prometheus counter-schema test data and exit")
val genDeltaCounterData = toggle(noshort = true, descrYes = "Generate delta-counter-schema test data and exit")
Expand Down Expand Up @@ -140,9 +142,11 @@ object GatewayServer extends StrictLogging {
val genDeltaCounterData = userOpts.genDeltaCounterData.getOrElse(false)
val genOtelCumulativeHistData = userOpts.genOtelCumulativeHistData.getOrElse(false)
val genOtelDeltaHistData = userOpts.genOtelDeltaHistData.getOrElse(false)
val genOtelExpDeltaHistData = userOpts.genOtelExpDeltaHistData.getOrElse(false)

if (genHist || genGaugeData || genDeltaHist
|| genCounterData || genDeltaCounterData || genOtelDeltaHistData || genOtelCumulativeHistData) {
|| genCounterData || genDeltaCounterData || genOtelDeltaHistData ||
genOtelExpDeltaHistData || genOtelCumulativeHistData) {
val startTime = System.currentTimeMillis
logger.info(s"Generating $numSamples samples starting at $startTime....")

Expand All @@ -151,7 +155,10 @@ object GatewayServer extends StrictLogging {
otelCumulativeHistogram)
else if (genOtelDeltaHistData) TestTimeseriesProducer.genHistogramData(startTime, numSeries,
otelDeltaHistogram)
else if (genDeltaHist) TestTimeseriesProducer.genHistogramData(startTime, numSeries, deltaHistogram)
else if (genOtelExpDeltaHistData) TestTimeseriesProducer.genHistogramData(startTime, numSeries,
otelDeltaHistogram, otelExponential = true)
else if (genDeltaHist)
TestTimeseriesProducer.genHistogramData(startTime, numSeries, deltaHistogram)
else if (genGaugeData) TestTimeseriesProducer.timeSeriesData(startTime, numSeries,
userOpts.numMetrics(), userOpts.publishIntervalSecs(), gauge)
else if (genDeltaCounterData) TestTimeseriesProducer.timeSeriesData(startTime, numSeries,
Expand All @@ -170,8 +177,8 @@ object GatewayServer extends StrictLogging {
}
Thread sleep 10000
TestTimeseriesProducer.logQueryHelp(dataset.name, userOpts.numMetrics(), numSamples, numSeries,
startTime, genHist, genDeltaHist, genGaugeData, genCounterData, genOtelCumulativeHistData, genOtelDeltaHistData,
userOpts.publishIntervalSecs())
startTime, genHist, genDeltaHist, genGaugeData, genCounterData, genOtelCumulativeHistData,
genOtelDeltaHistData, genOtelExpDeltaHistData, userOpts.publishIntervalSecs())
logger.info(s"Waited for containers to be sent, exiting...")
sys.exit(0)
} else {
Expand Down Expand Up @@ -260,7 +267,7 @@ object GatewayServer extends StrictLogging {
producing(shard) = false
output
}
}
}.filter { case (_, j) => j.nonEmpty }
logger.info(s"Created $numShards container builder queues with $parallelism parallel workers...")
(shardQueues, containerStream)
}
Expand Down
62 changes: 60 additions & 2 deletions gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import remote.RemoteStorage.TimeSeries
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{DatasetOptions, Schema}
import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String => ZCUTF8}
import filodb.memory.format.vectors.{CustomBuckets, LongHistogram}
import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram}

/**
* An InputRecord represents one "record" of timeseries data for input to FiloDB system.
Expand Down Expand Up @@ -182,7 +182,7 @@ object InputRecord {
}

/**
* Writes a delta non-increasing histogram record, along with the sum, count, min and max
* Writes a delta non-decreasing histogram record, along with the sum, count, min and max
*/
def writeOtelDeltaHistRecord(builder: RecordBuilder,
metric: String,
Expand Down Expand Up @@ -230,6 +230,64 @@ object InputRecord {
}
}

/**
* Writes a non-decreasing histogram record, along with the sum, count, min and max
*/
//scalastyle:off method.length
def writeOtelExponentialHistRecord(builder: RecordBuilder,
metric: String,
tags: Map[String, String],
timestamp: Long,
kvs: Seq[(String, Double)],
isDelta: Boolean): Unit = {
var sum = Double.NaN
var count = Double.NaN
var min = Double.NaN
var max = Double.NaN
var posBucketOffset = Double.NaN
var scale = Double.NaN

// Filter out sum and count, then convert and sort buckets
val sortedBuckets = kvs.filter {
case ("sum", v) => sum = v
false
case ("count", v) => count = v
false
case ("min", v) => min = v
false
case ("max", v) => max = v
false
case ("posBucketOffset", v) => posBucketOffset = v
false
case ("scale", v) => scale = v
false
case other => true
}.map {
case (k, v) => (k.toInt, v.toLong)
}.sorted

val bucketValues = sortedBuckets.map(_._2).toArray

if (sortedBuckets.nonEmpty) {
// length - 1 because the zero bucket is not included in the positive bucket count
val buckets = Base2ExpHistogramBuckets(scale.toInt, posBucketOffset.toInt, sortedBuckets.length - 1)
val hist = LongHistogram(buckets, bucketValues)

// Now, write out histogram
builder.startNewRecord(if (isDelta) otelDeltaHistogram else otelCumulativeHistogram)
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addBlob(hist.serialize())
builder.addDouble(min)
builder.addDouble(max)

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
builder.endRecord()
}
}

/**
* Writes a delta non-increasing histogram record, along with the sum and count,
* using the delta-histogram schema, storing the entire histogram together for efficiency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,17 @@ object TestTimeseriesProducer extends StrictLogging {
//scalastyle:off method.length parameter.number
def logQueryHelp(dataset: String, numMetrics: Int, numSamples: Int, numTimeSeries: Int, startTimeMs: Long,
genHist: Boolean, genDeltaHist: Boolean, genGauge: Boolean,
genPromCounter: Boolean, genOtelCumulativeHistData: Boolean, genOtelDeltaHistData: Boolean,
genPromCounter: Boolean, genOtelCumulativeHistData: Boolean,
genOtelDeltaHistData: Boolean, genOtelExpDeltaHistData: Boolean,
publishIntervalSec: Int): Unit = {
val startQuery = startTimeMs / 1000
val endQuery = startQuery + (numSamples / numMetrics / numTimeSeries) * publishIntervalSec
logger.info(s"Finished producing $numSamples records for ${(endQuery-startQuery).toDouble/60} minutes")

val metricName = if (genGauge) "heap_usage0"
else if (genHist || genOtelCumulativeHistData) "http_request_latency"
else if (genDeltaHist || genOtelDeltaHistData) "http_request_latency_delta"
else if (genDeltaHist || genOtelDeltaHistData || genOtelExpDeltaHistData)
"http_request_latency_delta"
else if (genPromCounter) "heap_usage_counter0"
else "heap_usage_delta0"

Expand All @@ -98,15 +100,19 @@ object TestTimeseriesProducer extends StrictLogging {
dataset: Dataset,
shardMapper: ShardMapper,
spread: Int,
publishIntervalSec: Int): (Future[Unit], Observable[(Int, Seq[Array[Byte]])]) = {
publishIntervalSec: Int,
expHist: Boolean = false,
numBuckets: Int = 20): (Future[Unit], Observable[(Int, Seq[Array[Byte]])]) = {
val (shardQueues, containerStream) = GatewayServer.shardingPipeline(GlobalConfig.systemConfig, numShards, dataset)

val producingFut = Future {
timeSeriesData(startTimeMs, numTimeSeries, numMetricNames, publishIntervalSec, gauge)
.take(numSamples)
val data = if (expHist) genHistogramData(startTimeMs, numTimeSeries,
Schemas.otelDeltaHistogram, numBuckets = numBuckets, otelExponential = true)
else timeSeriesData(startTimeMs, numTimeSeries, numMetricNames, publishIntervalSec, gauge)
data.take(numSamples)
.foreach { rec =>
val shard = shardMapper.ingestionShard(rec.shardKeyHash, rec.partitionKeyHash, spread)
while (!shardQueues(shard).offer(rec)) { Thread sleep 50 }
while (!shardQueues(shard).offer(rec)) { Thread sleep 50 }
}
}
(producingFut, containerStream)
Expand Down Expand Up @@ -203,18 +209,20 @@ object TestTimeseriesProducer extends StrictLogging {
* Note: the set of "instance" tags is unique for each invocation of genHistogramData. This helps increase
* the cardinality of time series for testing purposes.
*/
def genHistogramData(startTime: Long, numTimeSeries: Int = 16, histSchema: Schema): Stream[InputRecord] = {
val numBuckets = 10
val histBucketScheme = bv.GeometricBuckets(2.0, 3.0, numBuckets)
var buckets = new Array[Long](numBuckets)
def genHistogramData(startTime: Long, numTimeSeries: Int = 16, histSchema: Schema,
numBuckets : Int = 20,
otelExponential: Boolean = false): Stream[InputRecord] = {
val histBucketScheme = if (otelExponential) bv.Base2ExpHistogramBuckets(3, -numBuckets/2, numBuckets)
else bv.GeometricBuckets(2.0, 3.0, numBuckets)
var buckets = new Array[Long](histBucketScheme.numBuckets)
val metric = if (Schemas.deltaHistogram == histSchema || Schemas.otelDeltaHistogram == histSchema) {
"http_request_latency_delta"
} else {
"http_request_latency"
}

def updateBuckets(bucketNo: Int): Unit = {
for { b <- bucketNo until numBuckets } {
for { b <- bucketNo until histBucketScheme.numBuckets } {
buckets(b) += 1
}
}
Expand All @@ -232,9 +240,9 @@ object TestTimeseriesProducer extends StrictLogging {
if ( (Schemas.deltaHistogram == histSchema || Schemas.otelDeltaHistogram == histSchema )
&& prevTimestamp != timestamp) {
prevTimestamp = timestamp
buckets = new Array[Long](numBuckets)
buckets = new Array[Long](histBucketScheme.numBuckets)
}
updateBuckets(n % numBuckets)
updateBuckets(n % histBucketScheme.numBuckets)
val hist = bv.LongHistogram(histBucketScheme, buckets.map(x => x))
val count = util.Random.nextInt(100).toDouble
val sum = buckets.sum.toDouble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ package filodb.gateway.conversion
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.Schemas
import filodb.memory.MemFactory
import filodb.memory.format.vectors.{CustomBuckets, LongHistogram}
import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
val builder = new RecordBuilder(MemFactory.onHeapFactory)
val builder2 = new RecordBuilder(MemFactory.onHeapFactory)
val builder3 = new RecordBuilder(MemFactory.onHeapFactory)

val baseTags = Map("dataset" -> "timeseries",
"host" -> "MacBook-Pro-229.local",
Expand Down Expand Up @@ -97,6 +98,32 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
}
}

it("should otelExpDeltaHistogram to BR and be able to deserialize it") {
val bucketScheme = Base2ExpHistogramBuckets(3, -5, 10)
val bucketsCounts = Array(1L, 2, 3,4, 5, 6, 7, 8, 9, 10, 11) // require cumulative counts
val expected = LongHistogram(bucketScheme, bucketsCounts)

val bucketKVs = bucketsCounts.zipWithIndex.map {
case (bucketCount, i) => i.toString -> bucketCount.toDouble
}.toSeq

// add posBucketOffset and scale
val more = Seq("posBucketOffset" -> bucketScheme.startIndexPositiveBuckets.toDouble,
"scale" -> bucketScheme.scale.toDouble)

InputRecord.writeOtelExponentialHistRecord(builder3, metric, baseTags, 100000L,
bucketKVs ++ sumCountMinMaxKVs ++ more, isDelta = true)
builder3.allContainers.head.iterate(Schemas.otelDeltaHistogram.ingestionSchema).foreach { row =>
row.getDouble(1) shouldEqual sum
row.getDouble(2) shouldEqual count
row.getDouble(4) shouldEqual min
row.getDouble(5) shouldEqual max
val hist = row.getHistogram(3).asInstanceOf[LongHistogram]
hist.buckets shouldEqual expected.buckets
hist.values shouldEqual Array(1L, 2, 3,4, 5, 6, 7, 8, 9, 10, 11)
}
}

it("should skip empty histograms via writePromHistRecord, and write subsequent records") {
builder.reset()
InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, sumCountKVs)
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ enum InstantFunctionId {
MONTH = 20;
YEAR = 21;
OR_VECTOR_DOUBLE = 22;
HISTOGRAM_FRACTION = 23;
}

enum ScalarFunctionId {
Expand Down
Loading

0 comments on commit 6afeb38

Please sign in to comment.