Skip to content

Commit

Permalink
[v2][adjuster] Implement ip attribute adjuster to operate on otlp dat…
Browse files Browse the repository at this point in the history
…a model (jaegertracing#6355)

## Which problem is this PR solving?
- Towards jaegertracing#6344

## Description of the changes
- This PR implements the `IPTag` adjuster to operate on the OTLP data
model. In the OTLP model, tags are dubbed as attributes so the adjuster
was renamed to `IPAttribute`.

## How was this change tested?
- Added unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Dec 13, 2024
1 parent 989dcb0 commit cd99501
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 7 deletions.
14 changes: 7 additions & 7 deletions cmd/query/app/querysvc/adjuster/adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ type Adjuster interface {
}

// Func is a type alias that wraps a function and makes an Adjuster from it.
type Func func(trace ptrace.Traces) (ptrace.Traces, error)
type Func func(traces ptrace.Traces) (ptrace.Traces, error)

// Adjust implements Adjuster interface for the Func alias.
func (f Func) Adjust(trace ptrace.Traces) (ptrace.Traces, error) {
return f(trace)
func (f Func) Adjust(traces ptrace.Traces) (ptrace.Traces, error) {
return f(traces)
}

// Sequence creates an adjuster that combines a series of adjusters
Expand All @@ -44,17 +44,17 @@ type sequence struct {
failFast bool
}

func (c sequence) Adjust(trace ptrace.Traces) (ptrace.Traces, error) {
func (c sequence) Adjust(traces ptrace.Traces) (ptrace.Traces, error) {
var errs []error
for _, adjuster := range c.adjusters {
var err error
trace, err = adjuster.Adjust(trace)
traces, err = adjuster.Adjust(traces)
if err != nil {
if c.failFast {
return trace, err
return traces, err
}
errs = append(errs, err)
}
}
return trace, errors.Join(errs...)
return traces, errors.Join(errs...)
}
77 changes: 77 additions & 0 deletions cmd/query/app/querysvc/adjuster/ipattribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"bytes"
"encoding/binary"
"strconv"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var ipAttributesToCorrect = map[string]struct{}{
"ip": {},
"peer.ipv4": {},
}

// IPAttribute returns an adjuster that replaces numeric "ip" attributes,
// which usually contain IPv4 packed into uint32, with their string
// representation (e.g. "8.8.8.8"").
func IPAttribute() Adjuster {
return Func(func(traces ptrace.Traces) (ptrace.Traces, error) {
adjuster := ipAttributeAdjuster{}
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
adjuster.adjust(rs.Resource().Attributes())
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
adjuster.adjust(span.Attributes())
}
}
}
return traces, nil
})
}

type ipAttributeAdjuster struct{}

func (ipAttributeAdjuster) adjust(attributes pcommon.Map) {
adjusted := make(map[string]string)
attributes.Range(func(k string, v pcommon.Value) bool {
if _, ok := ipAttributesToCorrect[k]; !ok {
return true
}
var value uint32
switch v.Type() {
case pcommon.ValueTypeInt:
//nolint: gosec // G115
value = uint32(v.Int())
case pcommon.ValueTypeDouble:
value = uint32(v.Double())
default:
return true
}
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], value)
var sBuf bytes.Buffer
for i, b := range buf {
if i > 0 {
sBuf.WriteRune('.')
}
sBuf.WriteString(strconv.FormatUint(uint64(b), 10))
}
adjusted[k] = sBuf.String()
return true
})
for k, v := range adjusted {
attributes.PutStr(k, v)
}
}
81 changes: 81 additions & 0 deletions cmd/query/app/querysvc/adjuster/ipattribute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestIPAttributeAdjuster(t *testing.T) {
traces := ptrace.NewTraces()
resourceSpans := traces.ResourceSpans().AppendEmpty()

resourceAttributes := resourceSpans.Resource().Attributes()
resourceAttributes.PutInt("a", 42)
resourceAttributes.PutInt("ip", 1<<24|2<<16|3<<8|4)
resourceAttributes.PutStr("peer.ipv4", "something")

spans := resourceSpans.ScopeSpans().AppendEmpty().Spans()

createSpan := func(attrs map[string]any) {
span := spans.AppendEmpty()
for key, value := range attrs {
switch v := value.(type) {
case int:
span.Attributes().PutInt(key, int64(v))
case string:
span.Attributes().PutStr(key, v)
case float64:
span.Attributes().PutDouble(key, v)
}
}
}

createSpan(map[string]any{
"a": 42,
"ip": int(1<<25 | 2<<16 | 3<<8 | 4),
"peer.ipv4": "something else",
})

createSpan(map[string]any{
"ip": float64(1<<26 | 2<<16 | 3<<8 | 4),
})

assertAttribute := func(attributes pcommon.Map, key string, expected any) {
val, ok := attributes.Get(key)
require.True(t, ok)
switch v := expected.(type) {
case int:
require.EqualValues(t, v, val.Int())
case string:
require.EqualValues(t, v, val.Str())
}
}

trace, err := IPAttribute().Adjust(traces)
require.NoError(t, err)

resourceSpan := trace.ResourceSpans().At(0)
assert.Equal(t, 3, resourceSpan.Resource().Attributes().Len())

assertAttribute(resourceSpan.Resource().Attributes(), "a", 42)
assertAttribute(resourceSpan.Resource().Attributes(), "ip", "1.2.3.4")
assertAttribute(resourceSpan.Resource().Attributes(), "peer.ipv4", "something")

gotSpans := resourceSpan.ScopeSpans().At(0).Spans()
assert.Equal(t, 2, gotSpans.Len())

assert.Equal(t, 3, gotSpans.At(0).Attributes().Len())
assertAttribute(gotSpans.At(0).Attributes(), "a", 42)
assertAttribute(gotSpans.At(0).Attributes(), "ip", "2.2.3.4")
assertAttribute(gotSpans.At(0).Attributes(), "peer.ipv4", "something else")

assert.Equal(t, 1, gotSpans.At(1).Attributes().Len())
assertAttribute(gotSpans.At(1).Attributes(), "ip", "4.2.3.4")
}

0 comments on commit cd99501

Please sign in to comment.