Skip to content

Commit

Permalink
[internal/exp/metrics] Add functions to merge metrics (open-telemetry…
Browse files Browse the repository at this point in the history
…#32794)

**Description:**

This will merge the metrics in mdB into mdA, trying to re-use
resourceMetrics, scopeMetrics, and metric values as possible. This will
be used to help implement the new feature for:
open-telemetry#32513


**Link to tracking Issue:**
open-telemetry#32513
/
open-telemetry#32690

**Testing:**

I created a unit test which tests various scenarios of how merge
behavior should happen

**Documentation:**

The exported function is documented using standard golang style. And
there are comments inside the code to explain what is going on and why

---------

Co-authored-by: Ziqi Zhao <[email protected]>
  • Loading branch information
RichieSams and fatsheep9146 authored May 29, 2024
1 parent 821c864 commit f664dbb
Show file tree
Hide file tree
Showing 12 changed files with 1,025 additions and 3 deletions.
7 changes: 7 additions & 0 deletions internal/exp/metrics/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/me
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.101.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.101.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.101.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/pdata v1.8.1-0.20240527192838-af4fdd4e342a
go.opentelemetry.io/collector/semconv v0.101.1-0.20240527192838-af4fdd4e342a
)

require (
Expand All @@ -27,3 +30,7 @@ require (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../../pkg/pdatatest
2 changes: 2 additions & 0 deletions internal/exp/metrics/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 124 additions & 0 deletions internal/exp/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics"

import (
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
)

// Merge will merge the metrics data in mdB into mdA, then return mdA.
// mdB will not be modified. The function will attempt to merge the data in mdB into
// existing ResourceMetrics / ScopeMetrics / Metrics in mdA if possible. If they don't
// exist, new entries will be created as needed.
//
// NOTE: Any "unnecessary" duplicate entries in mdA will *not* be combined. For example if
// mdA contains two ResourcMetric entries with identical Resource values, they will not be
// combined. If you wish to have this behavior, you could call this function twice:
//
// cleanedMetrics := Merge(pmetric.NewMetrics(), mdA)
// Merge(cleanedMetrics, mdB)
//
// That said, this will do a large amount of memory copying
func Merge(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics {
outer:
for i := 0; i < mdB.ResourceMetrics().Len(); i++ {
rmB := mdB.ResourceMetrics().At(i)
resourceIDB := identity.OfResource(rmB.Resource())

for j := 0; j < mdA.ResourceMetrics().Len(); j++ {
rmA := mdA.ResourceMetrics().At(j)
resourceIDA := identity.OfResource(rmA.Resource())

if resourceIDA == resourceIDB {
mergeResourceMetrics(resourceIDA, rmA, rmB)
continue outer
}
}

// We didn't find a match
// Add it to mdA
newRM := mdA.ResourceMetrics().AppendEmpty()
rmB.CopyTo(newRM)
}

return mdA
}

func mergeResourceMetrics(resourceID identity.Resource, rmA pmetric.ResourceMetrics, rmB pmetric.ResourceMetrics) pmetric.ResourceMetrics {
outer:
for i := 0; i < rmB.ScopeMetrics().Len(); i++ {
smB := rmB.ScopeMetrics().At(i)
scopeIDB := identity.OfScope(resourceID, smB.Scope())

for j := 0; j < rmA.ScopeMetrics().Len(); j++ {
smA := rmA.ScopeMetrics().At(j)
scopeIDA := identity.OfScope(resourceID, smA.Scope())

if scopeIDA == scopeIDB {
mergeScopeMetrics(scopeIDA, smA, smB)
continue outer
}
}

// We didn't find a match
// Add it to rmA
newSM := rmA.ScopeMetrics().AppendEmpty()
smB.CopyTo(newSM)
}

return rmA
}

func mergeScopeMetrics(scopeID identity.Scope, smA pmetric.ScopeMetrics, smB pmetric.ScopeMetrics) pmetric.ScopeMetrics {
outer:
for i := 0; i < smB.Metrics().Len(); i++ {
mB := smB.Metrics().At(i)
metricIDB := identity.OfMetric(scopeID, mB)

for j := 0; j < smA.Metrics().Len(); j++ {
mA := smA.Metrics().At(j)
metricIDA := identity.OfMetric(scopeID, mA)

if metricIDA == metricIDB {
//exhaustive:enforce
switch mA.Type() {
case pmetric.MetricTypeGauge:
mergeDataPoints(mA.Gauge().DataPoints(), mB.Gauge().DataPoints())
case pmetric.MetricTypeSum:
mergeDataPoints(mA.Sum().DataPoints(), mB.Sum().DataPoints())
case pmetric.MetricTypeHistogram:
mergeDataPoints(mA.Histogram().DataPoints(), mB.Histogram().DataPoints())
case pmetric.MetricTypeExponentialHistogram:
mergeDataPoints(mA.ExponentialHistogram().DataPoints(), mB.ExponentialHistogram().DataPoints())
case pmetric.MetricTypeSummary:
mergeDataPoints(mA.Summary().DataPoints(), mB.Summary().DataPoints())
}

continue outer
}
}

// We didn't find a match
// Add it to smA
newM := smA.Metrics().AppendEmpty()
mB.CopyTo(newM)
}

return smA
}

func mergeDataPoints[DPS streams.DataPointSlice[DP], DP streams.DataPoint[DP]](dataPointsA DPS, dataPointsB DPS) DPS {
// Append all the datapoints from B to A
for i := 0; i < dataPointsB.Len(); i++ {
dpB := dataPointsB.At(i)

newDP := dataPointsA.AppendEmpty()
dpB.CopyTo(newDP)
}

return dataPointsA
}
177 changes: 177 additions & 0 deletions internal/exp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics_test

import (
"math/rand"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

func TestMergeMetrics(t *testing.T) {
t.Parallel()

testCases := []string{
"basic_merge",
"a_duplicate_data",
}

for _, tc := range testCases {
testName := tc

t.Run(testName, func(t *testing.T) {
t.Parallel()

dir := filepath.Join("testdata", testName)

mdA, err := golden.ReadMetrics(filepath.Join(dir, "a.yaml"))
require.NoError(t, err)

mdB, err := golden.ReadMetrics(filepath.Join(dir, "b.yaml"))
require.NoError(t, err)

expectedOutput, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml"))
require.NoError(t, err)

output := metrics.Merge(mdA, mdB)
require.NoError(t, pmetrictest.CompareMetrics(expectedOutput, output))
})
}
}

func naiveMerge(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics {
for i := 0; i < mdB.ResourceMetrics().Len(); i++ {
rm := mdB.ResourceMetrics().At(i)

rmCopy := mdA.ResourceMetrics().AppendEmpty()
rm.CopyTo(rmCopy)
}

return mdA
}

func BenchmarkMergeManyIntoSingle(b *testing.B) {
benchmarks := []struct {
name string
mergeFunc func(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics
}{
{
name: "Naive",
mergeFunc: naiveMerge,
},
{
name: "Deduplicating",
mergeFunc: metrics.Merge,
},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
// Make mdA just be a single resource metric with a single scope metric and a single metric
mdAClean := generateMetrics(b, 1)
mdB := generateMetrics(b, 10000)

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
mdA := pmetric.NewMetrics()
mdAClean.CopyTo(mdA)
b.StartTimer()

bm.mergeFunc(mdA, mdB)
}
})
}
}

func BenchmarkMergeManyIntoMany(b *testing.B) {
benchmarks := []struct {
name string
mergeFunc func(mdA pmetric.Metrics, mdB pmetric.Metrics) pmetric.Metrics
}{
{
name: "Naive",
mergeFunc: naiveMerge,
},
{
name: "Deduplicating",
mergeFunc: metrics.Merge,
},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
mdAClean := generateMetrics(b, 10000)
mdB := generateMetrics(b, 10000)

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
mdA := pmetric.NewMetrics()
mdAClean.CopyTo(mdA)
b.StartTimer()

bm.mergeFunc(mdA, mdB)
}
})
}
}

// generateMetrics creates a pmetric.Metrics instance with `rmCount` resourceMetrics.
// Each resource metric is the same as the others, each scope metric is the same
// as the others, each metric is the same as the others. But the datapoints are different
func generateMetrics(t require.TestingT, rmCount int) pmetric.Metrics {
md := pmetric.NewMetrics()

timeStamp := pcommon.Timestamp(rand.Intn(256))
value := int64(rand.Intn(256))

for i := 0; i < rmCount; i++ {
rm := md.ResourceMetrics().AppendEmpty()
err := rm.Resource().Attributes().FromRaw(map[string]any{
conventions.AttributeServiceName: "service-test",
})
require.NoError(t, err)

sm := rm.ScopeMetrics().AppendEmpty()
scope := sm.Scope()
scope.SetName("MyTestInstrument")
scope.SetVersion("1.2.3")
err = scope.Attributes().FromRaw(map[string]any{
"scope.key": "scope-test",
})
require.NoError(t, err)

m := sm.Metrics().AppendEmpty()
m.SetName("metric.test")

sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
sum.SetIsMonotonic(true)

dp := sum.DataPoints().AppendEmpty()

dp.SetTimestamp(timeStamp)
timeStamp += 10

dp.SetIntValue(value)
value += 15

err = dp.Attributes().FromRaw(map[string]any{
"datapoint.key": "dp-test",
})
require.NoError(t, err)
}

return md
}
18 changes: 17 additions & 1 deletion internal/exp/metrics/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"

import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
import (
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// Sequence of streams that can be iterated upon
type Seq[T any] func(yield func(identity.Stream, T) bool) bool
Expand Down Expand Up @@ -63,3 +67,15 @@ func (m HashMap[T]) Clear() {
type Evictor interface {
Evict() (gone identity.Stream, ok bool)
}

type DataPointSlice[DP DataPoint[DP]] interface {
Len() int
At(i int) DP
AppendEmpty() DP
}

type DataPoint[Self any] interface {
Timestamp() pcommon.Timestamp
Attributes() pcommon.Map
CopyTo(dest Self)
}
Loading

0 comments on commit f664dbb

Please sign in to comment.