diff --git a/cmd/carbonapi/config/config.go b/cmd/carbonapi/config/config.go index 099a0e892..b8b1b0891 100644 --- a/cmd/carbonapi/config/config.go +++ b/cmd/carbonapi/config/config.go @@ -113,6 +113,9 @@ type ConfigType struct { MaxQueryLength uint64 `mapstructure:"maxQueryLength"` CombineMultipleTargetsInOne bool `mapstructure:"combineMultipleTargetsInOne"` + NudgeStartTimeOnAggregation bool `mapstructure:"nudgeStartTimeOnAggregation"` + UseBucketsHighestTimestampOnAggregation bool `mapstructure:"useBucketsHighestTimestampOnAggregation"` + ResponseCache cache.BytesCache `mapstructure:"-" json:"-"` BackendCache cache.BytesCache `mapstructure:"-" json:"-"` diff --git a/cmd/carbonapi/config/init.go b/cmd/carbonapi/config/init.go index 4293ae492..efdf9655c 100644 --- a/cmd/carbonapi/config/init.go +++ b/cmd/carbonapi/config/init.go @@ -26,6 +26,7 @@ import ( fconfig "github.com/go-graphite/carbonapi/expr/functions/config" "github.com/go-graphite/carbonapi/expr/helper" "github.com/go-graphite/carbonapi/expr/rewrite" + tconfig "github.com/go-graphite/carbonapi/expr/types/config" "github.com/go-graphite/carbonapi/limiter" "github.com/go-graphite/carbonapi/pkg/parser" zipperTypes "github.com/go-graphite/carbonapi/zipper/types" @@ -257,6 +258,9 @@ func SetUpConfig(logger *zap.Logger, BuildVersion string) { ) } + tconfig.Config.NudgeStartTimeOnAggregation = Config.NudgeStartTimeOnAggregation + tconfig.Config.UseBucketsHighestTimestampOnAggregation = Config.UseBucketsHighestTimestampOnAggregation + if Config.Listen != "" { listeners := make(map[string]struct{}) for _, l := range Config.Listeners { @@ -401,6 +405,9 @@ func SetUpViper(logger *zap.Logger, configPath *string, exactConfig bool, viperP viper.SetDefault("useCachingDNSResolver", false) viper.SetDefault("logger", map[string]string{}) viper.SetDefault("combineMultipleTargetsInOne", false) + viper.SetDefault("nudgeStartTimeOnAggregation", false) + viper.SetDefault("useBucketsHighestTimestampOnAggregation", false) + viper.AutomaticEnv() var err error diff --git a/doc/configuration.md b/doc/configuration.md index 6c4321e28..3b2a5af22 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -56,6 +56,8 @@ Table of Contents * [For IRONdb](#for-irondb) * [expireDelaySec](#expiredelaysec) * [Example](#example-21) + * [nudgeStartTimeOnAggregation](#nudgestarttimeonaggregation) + * [useBucketsHighestTimestampOnAggregation](#usebucketshighesttimestamponaggregation) # General configuration for carbonapi @@ -940,3 +942,28 @@ Default: 600 (10 minutes) ```yaml expireDelaySec: 10 ``` + +*** +## nudgeStartTimeOnAggregation +Enables nudging the start time of metrics when aggregating to honor MaxDataPoints. +The start time is nudged in such way that timestamps always fall in the same bucket. +This is done by GraphiteWeb, and is useful to avoid jitter in graphs when refreshing the page. + +Default: false + +### Example +```yaml +nudgeStartTimeOnAggregation: true +``` + +*** +## useBucketsHighestTimestampOnAggregation +Enables using the highest timestamp of the buckets when aggregating to honor MaxDataPoints, instead of the lowest timestamp. +This prevents results to appear to predict the future. + +Default: false + +### Example +```yaml +useBucketsHighestTimestampOnAggregation: true +``` diff --git a/expr/types/config/config.go b/expr/types/config/config.go new file mode 100644 index 000000000..a7a2d7d4b --- /dev/null +++ b/expr/types/config/config.go @@ -0,0 +1,16 @@ +package config + +type ConfigType = struct { + // NudgeStartTimeOnAggregation enables nudging the start time of metrics + // when aggregated. The start time is nudged in such way that timestamps + // always fall in the same bucket. This is done by GraphiteWeb, and is + // useful to avoid jitter in graphs when refreshing the page. + NudgeStartTimeOnAggregation bool + + // UseBucketsHighestTimestampOnAggregation enables using the highest timestamp of the + // buckets when aggregating to honor MaxDataPoints, instead of the lowest timestamp. + // This prevents results to appear to predict the future. + UseBucketsHighestTimestampOnAggregation bool +} + +var Config = ConfigType{} diff --git a/expr/types/types.go b/expr/types/types.go index ca30bea52..49de0225b 100644 --- a/expr/types/types.go +++ b/expr/types/types.go @@ -11,6 +11,7 @@ import ( "github.com/go-graphite/carbonapi/expr/consolidations" "github.com/go-graphite/carbonapi/expr/tags" + "github.com/go-graphite/carbonapi/expr/types/config" pbv2 "github.com/go-graphite/protocol/carbonapi_v2_pb" pb "github.com/go-graphite/protocol/carbonapi_v3_pb" pickle "github.com/lomik/og-rek" @@ -141,7 +142,7 @@ func MarshalJSON(results []*MetricData, timestampMultiplier int64, noNullPoints b = append(b, `,"datapoints":[`...) var innerComma bool - t := r.StartTime * timestampMultiplier + t := r.AggregatedStartTime() * timestampMultiplier for _, v := range r.AggregatedValues() { if noNullPoints && math.IsNaN(v) { t += r.AggregatedTimeStep() * timestampMultiplier @@ -330,6 +331,60 @@ func (r *MetricData) AggregatedTimeStep() int64 { return r.StepTime * int64(r.ValuesPerPoint) } +// AggregatedStartTime returns the start time of the aggregated series. +// This can be different from the original start time if NudgeStartTimeOnAggregation +// or UseBucketsHighestTimestampOnAggregation are enabled. +func (r *MetricData) AggregatedStartTime() int64 { + start := r.StartTime + r.nudgePointsCount()*r.StepTime + if config.Config.UseBucketsHighestTimestampOnAggregation { + return start + r.AggregatedTimeStep() - r.StepTime + } + return start +} + +// nudgePointsCount returns the number of points to discard at the beginning of +// the series when aggregating. This is done if NudgeStartTimeOnAggregation is +// enabled, and has the purpose of assigning timestamps of a series to buckets +// consistently across different time ranges. To simplify the aggregation +// logic, we discard points at the beginning of the series so that a bucket +// starts right at the beginning. This function calculates how many points to +// discard. +func (r *MetricData) nudgePointsCount() int64 { + if !config.Config.NudgeStartTimeOnAggregation { + return 0 + } + + if len(r.Values) <= 2*r.ValuesPerPoint { + // There would be less than 2 points after aggregation, removing one would be too impactful. + return 0 + } + + // Suppose r.StartTime=4, r.StepTime=3 and aggTimeStep=6. + // - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ... + // - original buckets: -- -- --| | | | | | ... + // - aggregated buckets: -- -- --| | | ... + + // We start counting our aggTimeStep buckets at absolute time r.StepTime. + // Notice the following: + // - ts: 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 ... + // - bucket #: - - - 1 1 1 1 1 1 2 2 2 2 2 2 3 3 ... + // - (ts-step) % aggTimeStep: - - - 0 1 2 3 4 5 0 1 2 3 4 5 0 1 ... + + // Given a timestamp 'ts', we can calculate how far it is from the beginning + // of the nearest bucket to the right by doing: + // * aggTimeStep - ((ts-r.StepTime) % aggTimeStep) + // Using this, we calculate the 'distance' from r.StartTime to the + // nearest bucket to the right. If this distance is less than aggTimeStep, + // then r.StartTime is not the beginning of a bucket. We need to discard + // dist / r.StepTime points (which could be zero if dist < r.StepTime). + aggTimeStep := r.AggregatedTimeStep() + dist := aggTimeStep - ((r.StartTime - r.StepTime) % aggTimeStep) + if dist < aggTimeStep { + return dist / r.StepTime + } + return 0 +} + // GetAggregateFunction returns MetricData.AggregateFunction and set it, if it's not yet func (r *MetricData) GetAggregateFunction() func([]float64) float64 { if r.AggregateFunction == nil { @@ -363,7 +418,8 @@ func (r *MetricData) AggregateValues() { n := len(r.Values)/r.ValuesPerPoint + 1 aggV := make([]float64, 0, n) - v := r.Values + nudgeCount := r.nudgePointsCount() + v := r.Values[nudgeCount:] for len(v) >= r.ValuesPerPoint { val := aggFunc(v[:r.ValuesPerPoint]) diff --git a/expr/types/types_test.go b/expr/types/types_test.go new file mode 100644 index 000000000..888a6f4f1 --- /dev/null +++ b/expr/types/types_test.go @@ -0,0 +1,222 @@ +package types + +import ( + "testing" + + "github.com/go-graphite/carbonapi/expr/types/config" + "github.com/stretchr/testify/assert" +) + +func TestAggregatedValuesNudgedAndHighestTimestamp(t *testing.T) { + + config.Config.NudgeStartTimeOnAggregation = true + config.Config.UseBucketsHighestTimestampOnAggregation = true + + tests := []struct { + name string + values []float64 + step int64 + start int64 + mdp int64 + want []float64 + wantStep int64 + wantStart int64 + }{ + { + name: "empty", + values: []float64{}, + step: 60, + mdp: 100, + want: []float64{}, + wantStep: 60, + wantStart: 0, + }, + { + name: "one point", + values: []float64{1, 2, 3, 4}, + start: 10, + step: 10, + mdp: 1, + want: []float64{10}, + wantStep: 40, + wantStart: 40, + }, + { + name: "no nudge if few points", + values: []float64{1, 2, 3, 4}, + step: 10, + start: 20, + mdp: 1, + want: []float64{10}, + wantStep: 40, + wantStart: 50, + }, + + { + name: "should nudge the first point", + values: []float64{1, 2, 3, 4, 5, 6}, + start: 20, + step: 10, + mdp: 3, + want: []float64{5, 9, 6}, + wantStep: 20, + wantStart: 40, + }, + { + name: "should be stable with previous", + values: []float64{2, 3, 4, 5, 6, 7}, + start: 30, + step: 10, + mdp: 3, + want: []float64{5, 9, 13}, + wantStep: 20, + wantStart: 40, + }, + { + name: "more data", + values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, + start: 20, + step: 10, + mdp: 3, + want: []float64{40, 50}, + wantStep: 50, + wantStart: 100, + }, + { + name: "even more data", + values: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10.0, 11, 12, 13, 14}, + start: 10, + step: 10, + mdp: 3, + want: []float64{15, 40, 50}, + wantStep: 50, + wantStart: 50, + }, + { + name: "skewed start time", + values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10}, + start: 21, + step: 10, + mdp: 5, + want: []float64{2 + 3, 4 + 5, 6 + 7, 8 + 9, 10}, // no points discarded, bucket starts at 20 + wantStep: 20, + wantStart: 31, + }, + { + name: "skewed start time 2", + values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10}, + start: 29, + step: 10, + mdp: 5, + want: []float64{2 + 3, 4 + 5, 6 + 7, 8 + 9, 10}, // no points discarded, bucket starts at 20 + wantStep: 20, + wantStart: 39, + }, + { + name: "skewed start time 3", + values: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10}, + start: 31, + step: 10, + mdp: 5, + want: []float64{3 + 4, 5 + 6, 7 + 8, 9 + 10}, // 1st point discarded, it belongs to the incomplete bucket (20,40) + wantStep: 20, + wantStart: 51, + }, + { + name: "skewed start time no aggregation", + values: []float64{1, 2, 3, 4}, + start: 31, + step: 10, + mdp: 4, + want: []float64{1, 2, 3, 4}, + wantStep: 10, + wantStart: 31, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + input := MakeMetricData("test", tt.values, tt.step, tt.start) + input.ConsolidationFunc = "sum" + ConsolidateJSON(tt.mdp, []*MetricData{input}) + + got := input.AggregatedValues() + gotStep := input.AggregatedTimeStep() + gotStart := input.AggregatedStartTime() + + assert.Equal(t, tt.want, got, "bad values") + assert.Equal(t, tt.wantStep, gotStep, "bad step") + assert.Equal(t, tt.wantStart, gotStart, "bad start") + }) + } +} + +func TestAggregatedValuesConfigVariants(t *testing.T) { + const start = 20 + const step = 10 + const mdp = 3 + values := []float64{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14} + const expectedStep = int64(50) + /* + ts: | | 20 | 30 | 40 | 50 | 60 | 70 | 80 | 90 | 100 | 110 | 120 | 130 | 140 | + vals: | | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | + unaligned buckets: | | | + aligned buckets: | | | + */ + + tests := []struct { + name string + nudge bool + highestTimestamp bool + want []float64 + wantStart int64 + }{ + { + name: "nudge start and highest timestamp", + nudge: true, + highestTimestamp: true, + want: []float64{40, 50}, + wantStart: 100, + }, + { + name: "nudge start and not highest timestamp", + nudge: true, + highestTimestamp: false, + want: []float64{40, 50}, + wantStart: 60, + }, + { + name: "not nudge start and highest timestamp", + nudge: false, + highestTimestamp: true, + want: []float64{20, 45, 39}, + wantStart: 60, + }, + { + name: "not nudge start and not highest timestamp", + nudge: false, + highestTimestamp: false, + want: []float64{20, 45, 39}, + wantStart: 20, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config.Config.NudgeStartTimeOnAggregation = tt.nudge + config.Config.UseBucketsHighestTimestampOnAggregation = tt.highestTimestamp + + input := MakeMetricData("test", values, step, start) + input.ConsolidationFunc = "sum" + ConsolidateJSON(mdp, []*MetricData{input}) + + got := input.AggregatedValues() + gotStep := input.AggregatedTimeStep() + gotStart := input.AggregatedStartTime() + + assert.Equal(t, tt.want, got, "bad values") + assert.Equal(t, expectedStep, gotStep, "bad step") + assert.Equal(t, tt.wantStart, gotStart, "bad start") + }) + } +}