From f664dbbbc315f86537b043de7747c1b76b0660ca Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 29 May 2024 10:23:28 -0400 Subject: [PATCH] [internal/exp/metrics] Add functions to merge metrics (#32794) **Description:** This will merge the metrics in mdB into mdA, trying to re-use resourceMetrics, scopeMetrics, and metric values as possible. This will be used to help implement the new feature for: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32513 **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32513 / https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/32690 **Testing:** I created a unit test which tests various scenarios of how merge behavior should happen **Documentation:** The exported function is documented using standard golang style. And there are comments inside the code to explain what is going on and why --------- Co-authored-by: Ziqi Zhao --- internal/exp/metrics/go.mod | 7 + internal/exp/metrics/go.sum | 2 + internal/exp/metrics/metrics.go | 124 ++++++++++++ internal/exp/metrics/metrics_test.go | 177 +++++++++++++++++ internal/exp/metrics/streams/streams.go | 18 +- .../metrics/testdata/a_duplicate_data/a.yaml | 79 ++++++++ .../metrics/testdata/a_duplicate_data/b.yaml | 77 ++++++++ .../testdata/a_duplicate_data/output.yaml | 93 +++++++++ .../exp/metrics/testdata/basic_merge/a.yaml | 83 ++++++++ .../exp/metrics/testdata/basic_merge/b.yaml | 177 +++++++++++++++++ .../metrics/testdata/basic_merge/output.yaml | 187 ++++++++++++++++++ processor/deltatocumulativeprocessor/go.mod | 4 +- 12 files changed, 1025 insertions(+), 3 deletions(-) create mode 100644 internal/exp/metrics/metrics.go create mode 100644 internal/exp/metrics/metrics_test.go create mode 100644 internal/exp/metrics/testdata/a_duplicate_data/a.yaml create mode 100644 internal/exp/metrics/testdata/a_duplicate_data/b.yaml create mode 100644 internal/exp/metrics/testdata/a_duplicate_data/output.yaml create mode 100644 internal/exp/metrics/testdata/basic_merge/a.yaml create mode 100644 internal/exp/metrics/testdata/basic_merge/b.yaml create mode 100644 internal/exp/metrics/testdata/basic_merge/output.yaml diff --git a/internal/exp/metrics/go.mod b/internal/exp/metrics/go.mod index d400220ebcd9..85cc042fa47a 100644 --- a/internal/exp/metrics/go.mod +++ b/internal/exp/metrics/go.mod @@ -3,9 +3,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/me go 1.21.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.101.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.101.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.101.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/pdata v1.8.1-0.20240527192838-af4fdd4e342a + go.opentelemetry.io/collector/semconv v0.101.1-0.20240527192838-af4fdd4e342a ) require ( @@ -27,3 +30,7 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../../pkg/pdatatest diff --git a/internal/exp/metrics/go.sum b/internal/exp/metrics/go.sum index 95edfe997687..2e6d7f733fba 100644 --- a/internal/exp/metrics/go.sum +++ b/internal/exp/metrics/go.sum @@ -33,6 +33,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/collector/pdata v1.8.1-0.20240527192838-af4fdd4e342a h1:ODREkUcnSXEoJz7QzPbnDlVcQQ0qnxSyAgiRciSXWbU= go.opentelemetry.io/collector/pdata v1.8.1-0.20240527192838-af4fdd4e342a/go.mod h1:vk7LrfpyVpGZrRWcpjyy0DDZzL3SZiYMQxfap25551w= +go.opentelemetry.io/collector/semconv v0.101.1-0.20240527192838-af4fdd4e342a h1:6ZhX+c07YiQKr6Z+lbUJE/LoCtr6RzeJNoKxz/j0kEw= +go.opentelemetry.io/collector/semconv v0.101.1-0.20240527192838-af4fdd4e342a/go.mod h1:Egag6eTQ2oHrDRssbZTCOlU3NnVdKVtbLpeWQrBRXt8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= diff --git a/internal/exp/metrics/metrics.go b/internal/exp/metrics/metrics.go new file mode 100644 index 000000000000..cb931225043e --- /dev/null +++ b/internal/exp/metrics/metrics.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" +) + +// Merge will merge the metrics data in mdB into mdA, then return mdA. +// mdB will not be modified. The function will attempt to merge the data in mdB into +// existing ResourceMetrics / ScopeMetrics / Metrics in mdA if possible. If they don't +// exist, new entries will be created as needed. +// +// NOTE: Any "unnecessary" duplicate entries in mdA will *not* be combined. For example if +// mdA contains two ResourcMetric entries with identical Resource values, they will not be +// combined. If you wish to have this behavior, you could call this function twice: +// +// cleanedMetrics := Merge(pmetric.NewMetrics(), mdA) +// Merge(cleanedMetrics, mdB) +// +// That said, this will do a large amount of memory copying +func Merge(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics { +outer: + for i := 0; i < mdB.ResourceMetrics().Len(); i++ { + rmB := mdB.ResourceMetrics().At(i) + resourceIDB := identity.OfResource(rmB.Resource()) + + for j := 0; j < mdA.ResourceMetrics().Len(); j++ { + rmA := mdA.ResourceMetrics().At(j) + resourceIDA := identity.OfResource(rmA.Resource()) + + if resourceIDA == resourceIDB { + mergeResourceMetrics(resourceIDA, rmA, rmB) + continue outer + } + } + + // We didn't find a match + // Add it to mdA + newRM := mdA.ResourceMetrics().AppendEmpty() + rmB.CopyTo(newRM) + } + + return mdA +} + +func mergeResourceMetrics(resourceID identity.Resource, rmA pmetric.ResourceMetrics, rmB pmetric.ResourceMetrics) pmetric.ResourceMetrics { +outer: + for i := 0; i < rmB.ScopeMetrics().Len(); i++ { + smB := rmB.ScopeMetrics().At(i) + scopeIDB := identity.OfScope(resourceID, smB.Scope()) + + for j := 0; j < rmA.ScopeMetrics().Len(); j++ { + smA := rmA.ScopeMetrics().At(j) + scopeIDA := identity.OfScope(resourceID, smA.Scope()) + + if scopeIDA == scopeIDB { + mergeScopeMetrics(scopeIDA, smA, smB) + continue outer + } + } + + // We didn't find a match + // Add it to rmA + newSM := rmA.ScopeMetrics().AppendEmpty() + smB.CopyTo(newSM) + } + + return rmA +} + +func mergeScopeMetrics(scopeID identity.Scope, smA pmetric.ScopeMetrics, smB pmetric.ScopeMetrics) pmetric.ScopeMetrics { +outer: + for i := 0; i < smB.Metrics().Len(); i++ { + mB := smB.Metrics().At(i) + metricIDB := identity.OfMetric(scopeID, mB) + + for j := 0; j < smA.Metrics().Len(); j++ { + mA := smA.Metrics().At(j) + metricIDA := identity.OfMetric(scopeID, mA) + + if metricIDA == metricIDB { + //exhaustive:enforce + switch mA.Type() { + case pmetric.MetricTypeGauge: + mergeDataPoints(mA.Gauge().DataPoints(), mB.Gauge().DataPoints()) + case pmetric.MetricTypeSum: + mergeDataPoints(mA.Sum().DataPoints(), mB.Sum().DataPoints()) + case pmetric.MetricTypeHistogram: + mergeDataPoints(mA.Histogram().DataPoints(), mB.Histogram().DataPoints()) + case pmetric.MetricTypeExponentialHistogram: + mergeDataPoints(mA.ExponentialHistogram().DataPoints(), mB.ExponentialHistogram().DataPoints()) + case pmetric.MetricTypeSummary: + mergeDataPoints(mA.Summary().DataPoints(), mB.Summary().DataPoints()) + } + + continue outer + } + } + + // We didn't find a match + // Add it to smA + newM := smA.Metrics().AppendEmpty() + mB.CopyTo(newM) + } + + return smA +} + +func mergeDataPoints[DPS streams.DataPointSlice[DP], DP streams.DataPoint[DP]](dataPointsA DPS, dataPointsB DPS) DPS { + // Append all the datapoints from B to A + for i := 0; i < dataPointsB.Len(); i++ { + dpB := dataPointsB.At(i) + + newDP := dataPointsA.AppendEmpty() + dpB.CopyTo(newDP) + } + + return dataPointsA +} diff --git a/internal/exp/metrics/metrics_test.go b/internal/exp/metrics/metrics_test.go new file mode 100644 index 000000000000..df49b71aae60 --- /dev/null +++ b/internal/exp/metrics/metrics_test.go @@ -0,0 +1,177 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics_test + +import ( + "math/rand" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.9.0" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" +) + +func TestMergeMetrics(t *testing.T) { + t.Parallel() + + testCases := []string{ + "basic_merge", + "a_duplicate_data", + } + + for _, tc := range testCases { + testName := tc + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + dir := filepath.Join("testdata", testName) + + mdA, err := golden.ReadMetrics(filepath.Join(dir, "a.yaml")) + require.NoError(t, err) + + mdB, err := golden.ReadMetrics(filepath.Join(dir, "b.yaml")) + require.NoError(t, err) + + expectedOutput, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) + require.NoError(t, err) + + output := metrics.Merge(mdA, mdB) + require.NoError(t, pmetrictest.CompareMetrics(expectedOutput, output)) + }) + } +} + +func naiveMerge(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics { + for i := 0; i < mdB.ResourceMetrics().Len(); i++ { + rm := mdB.ResourceMetrics().At(i) + + rmCopy := mdA.ResourceMetrics().AppendEmpty() + rm.CopyTo(rmCopy) + } + + return mdA +} + +func BenchmarkMergeManyIntoSingle(b *testing.B) { + benchmarks := []struct { + name string + mergeFunc func(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics + }{ + { + name: "Naive", + mergeFunc: naiveMerge, + }, + { + name: "Deduplicating", + mergeFunc: metrics.Merge, + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + // Make mdA just be a single resource metric with a single scope metric and a single metric + mdAClean := generateMetrics(b, 1) + mdB := generateMetrics(b, 10000) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + mdA := pmetric.NewMetrics() + mdAClean.CopyTo(mdA) + b.StartTimer() + + bm.mergeFunc(mdA, mdB) + } + }) + } +} + +func BenchmarkMergeManyIntoMany(b *testing.B) { + benchmarks := []struct { + name string + mergeFunc func(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics + }{ + { + name: "Naive", + mergeFunc: naiveMerge, + }, + { + name: "Deduplicating", + mergeFunc: metrics.Merge, + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + mdAClean := generateMetrics(b, 10000) + mdB := generateMetrics(b, 10000) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + mdA := pmetric.NewMetrics() + mdAClean.CopyTo(mdA) + b.StartTimer() + + bm.mergeFunc(mdA, mdB) + } + }) + } +} + +// generateMetrics creates a pmetric.Metrics instance with `rmCount` resourceMetrics. +// Each resource metric is the same as the others, each scope metric is the same +// as the others, each metric is the same as the others. But the datapoints are different +func generateMetrics(t require.TestingT, rmCount int) pmetric.Metrics { + md := pmetric.NewMetrics() + + timeStamp := pcommon.Timestamp(rand.Intn(256)) + value := int64(rand.Intn(256)) + + for i := 0; i < rmCount; i++ { + rm := md.ResourceMetrics().AppendEmpty() + err := rm.Resource().Attributes().FromRaw(map[string]any{ + conventions.AttributeServiceName: "service-test", + }) + require.NoError(t, err) + + sm := rm.ScopeMetrics().AppendEmpty() + scope := sm.Scope() + scope.SetName("MyTestInstrument") + scope.SetVersion("1.2.3") + err = scope.Attributes().FromRaw(map[string]any{ + "scope.key": "scope-test", + }) + require.NoError(t, err) + + m := sm.Metrics().AppendEmpty() + m.SetName("metric.test") + + sum := m.SetEmptySum() + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + sum.SetIsMonotonic(true) + + dp := sum.DataPoints().AppendEmpty() + + dp.SetTimestamp(timeStamp) + timeStamp += 10 + + dp.SetIntValue(value) + value += 15 + + err = dp.Attributes().FromRaw(map[string]any{ + "datapoint.key": "dp-test", + }) + require.NoError(t, err) + } + + return md +} diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 8e64ae0e0305..5f0d715b6962 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -3,7 +3,11 @@ package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" -import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) // Sequence of streams that can be iterated upon type Seq[T any] func(yield func(identity.Stream, T) bool) bool @@ -63,3 +67,15 @@ func (m HashMap[T]) Clear() { type Evictor interface { Evict() (gone identity.Stream, ok bool) } + +type DataPointSlice[DP DataPoint[DP]] interface { + Len() int + At(i int) DP + AppendEmpty() DP +} + +type DataPoint[Self any] interface { + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + CopyTo(dest Self) +} diff --git a/internal/exp/metrics/testdata/a_duplicate_data/a.yaml b/internal/exp/metrics/testdata/a_duplicate_data/a.yaml new file mode 100644 index 000000000000..7ad88630d11a --- /dev/null +++ b/internal/exp/metrics/testdata/a_duplicate_data/a.yaml @@ -0,0 +1,79 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + # This entry has an identical scope, so it *could* be merged with the above entry. + # However, the initial structure of mdA is left as-is + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 60 + asDouble: 444 + attributes: + - key: aaa + value: + stringValue: bbb + # This entry has an identical resource_key, so it *could* be merged with the above entry. + # However, the initial structure of mdA is left as-is + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 70 + asDouble: 555 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/internal/exp/metrics/testdata/a_duplicate_data/b.yaml b/internal/exp/metrics/testdata/a_duplicate_data/b.yaml new file mode 100644 index 000000000000..cd767cd03f1e --- /dev/null +++ b/internal/exp/metrics/testdata/a_duplicate_data/b.yaml @@ -0,0 +1,77 @@ +resourceMetrics: + # These will all be collapsed into datapoints in the first resourceMetrics and first scopeMetrics instance + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 55 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 65 + asDouble: 450 + attributes: + - key: aaa + value: + stringValue: bbb + # This will also be collapsed into the first entry in mdA as a datapoint + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 70 + asDouble: 555 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/internal/exp/metrics/testdata/a_duplicate_data/output.yaml b/internal/exp/metrics/testdata/a_duplicate_data/output.yaml new file mode 100644 index 000000000000..77f57ad57bc1 --- /dev/null +++ b/internal/exp/metrics/testdata/a_duplicate_data/output.yaml @@ -0,0 +1,93 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 55 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 65 + asDouble: 450 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 70 + asDouble: 555 + attributes: + - key: aaa + value: + stringValue: bbb + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 60 + asDouble: 444 + attributes: + - key: aaa + value: + stringValue: bbb + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 70 + asDouble: 555 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/internal/exp/metrics/testdata/basic_merge/a.yaml b/internal/exp/metrics/testdata/basic_merge/a.yaml new file mode 100644 index 000000000000..6601a17ec563 --- /dev/null +++ b/internal/exp/metrics/testdata/basic_merge/a.yaml @@ -0,0 +1,83 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: foo + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - name: gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.histogram + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [4, 7, 9, 6, 25] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.exphistogram + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [4, 7, 9, 6, 25] + negative: + offset: 6 + bucketCounts: [2, 13, 7, 12, 4] + attributes: + - key: aaa + value: + stringValue: bbb + - name: summary + summary: + dataPoints: + - timeUnixNano: 50 + quantileValues: + - quantile: 0.25 + value: 50 + - quantile: 0.5 + value: 20 + - quantile: 0.75 + value: 75 + - quantile: 0.95 + value: 10 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/internal/exp/metrics/testdata/basic_merge/b.yaml b/internal/exp/metrics/testdata/basic_merge/b.yaml new file mode 100644 index 000000000000..059e2d9e3e14 --- /dev/null +++ b/internal/exp/metrics/testdata/basic_merge/b.yaml @@ -0,0 +1,177 @@ +resourceMetrics: + # The first entry is an identical resource metrics ID + # But different scope metrics ID + # And identical metric ID / stream data + # We should end up with a new scope metric entry + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + # The next entry has a difference resource_key, but identical scope and metric ID / data + # We should end up with a new resource metric entry + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: bar + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: foo + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + # The next entry has identical resource and scope IDs + # But a different metric name / type + # We should get a new metric entry added to the original scope metric entry + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: foo + metrics: + - name: delta.monotonic.sum + sum: + aggregationTemporality: 1 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + # The next entry has identical resource, scope, and metric IDs + # The metric datapoint should be added to the existing metric + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: foo + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 80 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + # Merge in some different metric types + # Which are identical metric IDs. So the datapoints should be merged + - name: gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 120 + asDouble: 255 + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.histogram + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 60 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [5, 8, 10, 7, 30] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.exphistogram + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 60 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [5, 8, 10, 7, 30] + negative: + offset: 6 + bucketCounts: [3, 14, 8, 13, 5] + attributes: + - key: aaa + value: + stringValue: bbb + - name: summary + summary: + dataPoints: + - timeUnixNano: 60 + quantileValues: + - quantile: 0.25 + value: 60 + - quantile: 0.5 + value: 30 + - quantile: 0.75 + value: 80 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/internal/exp/metrics/testdata/basic_merge/output.yaml b/internal/exp/metrics/testdata/basic_merge/output.yaml new file mode 100644 index 000000000000..7cef49b431d2 --- /dev/null +++ b/internal/exp/metrics/testdata/basic_merge/output.yaml @@ -0,0 +1,187 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: foo + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - name: gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 120 + asDouble: 255 + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.histogram + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [4, 7, 9, 6, 25] + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 60 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [5, 8, 10, 7, 30] + attributes: + - key: aaa + value: + stringValue: bbb + - name: cumulative.exphistogram + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [4, 7, 9, 6, 25] + negative: + offset: 6 + bucketCounts: [2, 13, 7, 12, 4] + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 60 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [5, 8, 10, 7, 30] + negative: + offset: 6 + bucketCounts: [3, 14, 8, 13, 5] + attributes: + - key: aaa + value: + stringValue: bbb + - name: summary + summary: + dataPoints: + - timeUnixNano: 50 + quantileValues: + - quantile: 0.25 + value: 50 + - quantile: 0.5 + value: 20 + - quantile: 0.75 + value: 75 + - quantile: 0.95 + value: 10 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 60 + quantileValues: + - quantile: 0.25 + value: 60 + - quantile: 0.5 + value: 30 + - quantile: 0.75 + value: 80 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb + - name: delta.monotonic.sum + sum: + aggregationTemporality: 1 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: bar + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: resource_key + value: + stringValue: bar + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: scope_key + value: + stringValue: foo + metrics: + - name: cumulative.monotonic.sum + sum: + aggregationTemporality: 2 + isMonotonic: true + dataPoints: + - timeUnixNano: 50 + asDouble: 333 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 076955ffa561..450dea45491b 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -59,8 +59,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest - replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden