diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go index dcb7ad4735c..c8a99d98f2e 100644 --- a/cmd/es-rollover/app/init/action.go +++ b/cmd/es-rollover/app/init/action.go @@ -13,6 +13,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/es-rollover/app" "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/filter" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" ) @@ -28,19 +29,15 @@ type Action struct { } func (c Action) getMapping(version uint, templateName string) (string, error) { + c.Config.Indices.IndexPrefix = config.IndexPrefix(c.Config.Config.IndexPrefix) mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate), - PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate), - PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate), - PrioritySamplingTemplate: int64(c.Config.PrioritySamplingTemplate), - Shards: int64(c.Config.Shards), - Replicas: int64(c.Config.Replicas), - IndexPrefix: c.Config.IndexPrefix, - UseILM: c.Config.UseILM, - ILMPolicyName: c.Config.ILMPolicyName, - EsVersion: version, + TemplateBuilder: es.TextTemplateBuilder{}, + Indices: c.Config.Indices, + UseILM: c.Config.UseILM, + ILMPolicyName: c.Config.ILMPolicyName, + EsVersion: version, } + return mappingBuilder.GetMapping(templateName) } @@ -62,7 +59,7 @@ func (c Action) Do() error { return fmt.Errorf("ILM policy %s doesn't exist in Elasticsearch. Please create it and re-run init", c.Config.ILMPolicyName) } } - rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.IndexPrefix) + rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.Config.IndexPrefix) for _, indexName := range rolloverIndices { if err := c.init(version, indexName); err != nil { return err @@ -114,7 +111,7 @@ func (c Action) init(version uint, indexopt app.IndexOption) error { return err } - jaegerIndices, err := c.IndicesClient.GetJaegerIndices(c.Config.IndexPrefix) + jaegerIndices, err := c.IndicesClient.GetJaegerIndices(c.Config.Config.IndexPrefix) if err != nil { return err } diff --git a/cmd/es-rollover/app/init/flags.go b/cmd/es-rollover/app/init/flags.go index dbe969fbe3d..a5b17d2d1df 100644 --- a/cmd/es-rollover/app/init/flags.go +++ b/cmd/es-rollover/app/init/flags.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/viper" "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" ) const ( @@ -21,14 +22,10 @@ const ( ) // Config holds configuration for index cleaner binary. +// Config.IndexPrefix supersedes Indices.IndexPrefix type Config struct { app.Config - Shards int - Replicas int - PrioritySpanTemplate int - PriorityServiceTemplate int - PriorityDependenciesTemplate int - PrioritySamplingTemplate int + cfg.Indices } // AddFlags adds flags for TLS to the FlagSet. @@ -43,10 +40,18 @@ func (*Config) AddFlags(flags *flag.FlagSet) { // InitFromViper initializes config from viper.Viper. func (c *Config) InitFromViper(v *viper.Viper) { - c.Shards = v.GetInt(shards) - c.Replicas = v.GetInt(replicas) - c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate) - c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate) - c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate) - c.PrioritySamplingTemplate = v.GetInt(prioritySamplingTemplate) + c.Indices.Spans.Shards = v.GetInt64(shards) + c.Indices.Services.Shards = v.GetInt64(shards) + c.Indices.Dependencies.Shards = v.GetInt64(shards) + c.Indices.Sampling.Shards = v.GetInt64(shards) + + c.Indices.Spans.Replicas = v.GetInt64(replicas) + c.Indices.Services.Replicas = v.GetInt64(replicas) + c.Indices.Dependencies.Replicas = v.GetInt64(replicas) + c.Indices.Sampling.Replicas = v.GetInt64(replicas) + + c.Indices.Spans.Priority = v.GetInt64(prioritySpanTemplate) + c.Indices.Services.Priority = v.GetInt64(priorityServiceTemplate) + c.Indices.Dependencies.Priority = v.GetInt64(priorityDependenciesTemplate) + c.Indices.Sampling.Priority = v.GetInt64(prioritySamplingTemplate) } diff --git a/cmd/es-rollover/app/init/flags_test.go b/cmd/es-rollover/app/init/flags_test.go index e3dbe54c94f..3409cffdc1f 100644 --- a/cmd/es-rollover/app/init/flags_test.go +++ b/cmd/es-rollover/app/init/flags_test.go @@ -33,10 +33,10 @@ func TestBindFlags(t *testing.T) { require.NoError(t, err) c.InitFromViper(v) - assert.Equal(t, 8, c.Shards) - assert.Equal(t, 16, c.Replicas) - assert.Equal(t, 300, c.PrioritySpanTemplate) - assert.Equal(t, 301, c.PriorityServiceTemplate) - assert.Equal(t, 302, c.PriorityDependenciesTemplate) - assert.Equal(t, 303, c.PrioritySamplingTemplate) + assert.EqualValues(t, 8, c.Indices.Spans.Shards) + assert.EqualValues(t, 16, c.Indices.Spans.Replicas) + assert.EqualValues(t, 300, c.Indices.Spans.Priority) + assert.EqualValues(t, 301, c.Indices.Services.Priority) + assert.EqualValues(t, 302, c.Indices.Dependencies.Priority) + assert.EqualValues(t, 303, c.Indices.Sampling.Priority) } diff --git a/cmd/esmapping-generator/app/flags_test.go b/cmd/esmapping-generator/app/flags_test.go index 5189685b4d6..3bd64cbefb1 100644 --- a/cmd/esmapping-generator/app/flags_test.go +++ b/cmd/esmapping-generator/app/flags_test.go @@ -20,8 +20,9 @@ func TestOptionsWithDefaultFlags(t *testing.T) { assert.Equal(t, "", o.Mapping) assert.Equal(t, uint(7), o.EsVersion) - assert.Equal(t, int64(5), o.Shards) - assert.Equal(t, int64(1), o.Replicas) + assert.EqualValues(t, 5, o.Shards) + assert.EqualValues(t, 1, o.Replicas) + assert.Equal(t, "", o.IndexPrefix) assert.Equal(t, "false", o.UseILM) assert.Equal(t, "jaeger-ilm-policy", o.ILMPolicyName) diff --git a/cmd/esmapping-generator/app/renderer/render.go b/cmd/esmapping-generator/app/renderer/render.go index 7cb3c89f49a..ed13ac168c5 100644 --- a/cmd/esmapping-generator/app/renderer/render.go +++ b/cmd/esmapping-generator/app/renderer/render.go @@ -8,6 +8,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/esmapping-generator/app" "github.com/jaegertracing/jaeger/pkg/es" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" ) @@ -25,14 +26,22 @@ func GetMappingAsString(builder es.TemplateBuilder, opt *app.Options) (string, e return "", err } + indexOpts := cfg.IndexOptions{ + Shards: opt.Shards, + Replicas: opt.Replicas, + } mappingBuilder := mappings.MappingBuilder{ TemplateBuilder: builder, - Shards: opt.Shards, - Replicas: opt.Replicas, - EsVersion: opt.EsVersion, - IndexPrefix: opt.IndexPrefix, - UseILM: enableILM, - ILMPolicyName: opt.ILMPolicyName, + Indices: cfg.Indices{ + IndexPrefix: cfg.IndexPrefix(opt.IndexPrefix), + Spans: indexOpts, + Services: indexOpts, + Dependencies: indexOpts, + Sampling: indexOpts, + }, + EsVersion: opt.EsVersion, + UseILM: enableILM, + ILMPolicyName: opt.ILMPolicyName, } return mappingBuilder.GetMapping(opt.Mapping) } diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index dbf5c0375bd..bae543e39ee 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -22,10 +22,32 @@ extensions: backends: some_storage: elasticsearch: - index_prefix: "jaeger-main" + indices: + index_prefix: "jaeger-main" + spans: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + services: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + dependencies: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + sampling: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 another_storage: elasticsearch: - index_prefix: "jaeger-archive" + indices: + index_prefix: "jaeger-archive" receivers: otlp: diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index f4d3085179f..e6b65e121e5 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -22,11 +22,32 @@ extensions: backends: some_storage: opensearch: - index_prefix: "jaeger-main" - + indices: + index_prefix: "jaeger-main" + spans: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + services: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + dependencies: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + sampling: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 another_storage: opensearch: - index_prefix: "jaeger-main" + indices: + index_prefix: "jaeger-archive" receivers: otlp: diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 129e10c4dc9..02b80380ede 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -33,48 +33,72 @@ import ( storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" ) +const ( + IndexPrefixSeparator = "-" +) + +// IndexOptions describes the index format and rollover frequency +type IndexOptions struct { + Priority int64 `mapstructure:"priority"` + DateLayout string `mapstructure:"date_layout"` + Shards int64 `mapstructure:"shards"` + Replicas int64 `mapstructure:"replicas"` + RolloverFrequency string `mapstructure:"rollover_frequency"` // "hour" or "day" +} + +// Indices describes different configuration options for each index type +type Indices struct { + IndexPrefix IndexPrefix `mapstructure:"index_prefix"` + Spans IndexOptions `mapstructure:"spans"` + Services IndexOptions `mapstructure:"services"` + Dependencies IndexOptions `mapstructure:"dependencies"` + Sampling IndexOptions `mapstructure:"sampling"` +} + +type IndexPrefix string + +func (p IndexPrefix) Apply(indexName string) string { + ps := string(p) + if ps == "" { + return indexName + } + if strings.HasSuffix(ps, IndexPrefixSeparator) { + return ps + indexName + } + return ps + IndexPrefixSeparator + indexName +} + // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { - Servers []string `mapstructure:"server_urls" valid:"required,url"` - RemoteReadClusters []string `mapstructure:"remote_read_clusters"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` - TokenFilePath string `mapstructure:"token_file"` - PasswordFilePath string `mapstructure:"password_file"` - AllowTokenFromContext bool `mapstructure:"-"` - Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing - SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query - MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads - NumShards int64 `mapstructure:"num_shards"` - NumReplicas int64 `mapstructure:"num_replicas"` - PrioritySpanTemplate int64 `mapstructure:"priority_span_template"` - PriorityServiceTemplate int64 `mapstructure:"priority_service_template"` - PriorityDependenciesTemplate int64 `mapstructure:"priority_dependencies_template"` - Timeout time.Duration `mapstructure:"-"` - BulkSize int `mapstructure:"-"` - BulkWorkers int `mapstructure:"-"` - BulkActions int `mapstructure:"-"` - BulkFlushInterval time.Duration `mapstructure:"-"` - IndexPrefix string `mapstructure:"index_prefix"` - IndexDateLayoutSpans string `mapstructure:"-"` - IndexDateLayoutServices string `mapstructure:"-"` - IndexDateLayoutSampling string `mapstructure:"-"` - IndexDateLayoutDependencies string `mapstructure:"-"` - IndexRolloverFrequencySpans string `mapstructure:"-"` - IndexRolloverFrequencyServices string `mapstructure:"-"` - IndexRolloverFrequencySampling string `mapstructure:"-"` - ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"` - AdaptiveSamplingLookback time.Duration `mapstructure:"-"` - Tags TagsAsFields `mapstructure:"tags_as_fields"` - Enabled bool `mapstructure:"-"` - TLS tlscfg.Options `mapstructure:"tls"` - UseReadWriteAliases bool `mapstructure:"use_aliases"` - CreateIndexTemplates bool `mapstructure:"create_mappings"` - UseILM bool `mapstructure:"use_ilm"` - Version uint `mapstructure:"version"` - LogLevel string `mapstructure:"log_level"` - SendGetBodyAs string `mapstructure:"send_get_body_as"` + Servers []string `mapstructure:"server_urls" valid:"required,url"` + RemoteReadClusters []string `mapstructure:"remote_read_clusters"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + TokenFilePath string `mapstructure:"token_file"` + PasswordFilePath string `mapstructure:"password_file"` + AllowTokenFromContext bool `mapstructure:"-"` + Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing + SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` + MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query + MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads + Timeout time.Duration `mapstructure:"-"` + BulkSize int `mapstructure:"-"` + BulkWorkers int `mapstructure:"-"` + BulkActions int `mapstructure:"-"` + BulkFlushInterval time.Duration `mapstructure:"-"` + Indices Indices `mapstructure:"indices"` + ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"` + AdaptiveSamplingLookback time.Duration `mapstructure:"-"` + Tags TagsAsFields `mapstructure:"tags_as_fields"` + Enabled bool `mapstructure:"-"` + // TODO: migration to OTEL's TLS configuration + TLS tlscfg.Options `mapstructure:"tls"` + UseReadWriteAliases bool `mapstructure:"use_aliases"` + CreateIndexTemplates bool `mapstructure:"create_mappings"` + UseILM bool `mapstructure:"use_ilm"` + Version uint `mapstructure:"version"` + LogLevel string `mapstructure:"log_level"` + SendGetBodyAs string `mapstructure:"send_get_body_as"` } // TagsAsFields holds configuration for tag schema. @@ -208,6 +232,28 @@ func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, err return esV8.NewClient(options) } +func setDefaultIndexOptions(target, source *IndexOptions) { + if target.Shards == 0 { + target.Shards = source.Shards + } + + if target.Replicas == 0 { + target.Replicas = source.Replicas + } + + if target.Priority == 0 { + target.Priority = source.Priority + } + + if target.DateLayout == "" { + target.DateLayout = source.DateLayout + } + + if target.RolloverFrequency == "" { + target.RolloverFrequency = source.RolloverFrequency + } +} + // ApplyDefaults copies settings from source unless its own value is non-zero. func (c *Configuration) ApplyDefaults(source *Configuration) { if len(c.RemoteReadClusters) == 0 { @@ -228,21 +274,14 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.AdaptiveSamplingLookback == 0 { c.AdaptiveSamplingLookback = source.AdaptiveSamplingLookback } - if c.NumShards == 0 { - c.NumShards = source.NumShards - } - if c.NumReplicas == 0 { - c.NumReplicas = source.NumReplicas - } - if c.PrioritySpanTemplate == 0 { - c.PrioritySpanTemplate = source.PrioritySpanTemplate - } - if c.PriorityServiceTemplate == 0 { - c.PriorityServiceTemplate = source.PriorityServiceTemplate - } - if c.PrioritySpanTemplate == 0 { - c.PriorityDependenciesTemplate = source.PriorityDependenciesTemplate + if c.Indices.IndexPrefix == "" { + c.Indices.IndexPrefix = source.Indices.IndexPrefix } + + setDefaultIndexOptions(&c.Indices.Spans, &source.Indices.Spans) + setDefaultIndexOptions(&c.Indices.Services, &source.Indices.Services) + setDefaultIndexOptions(&c.Indices.Dependencies, &source.Indices.Dependencies) + if c.BulkSize == 0 { c.BulkSize = source.BulkSize } @@ -281,23 +320,8 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { } } -// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration -func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration { - return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySpans) -} - -// GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration -func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration { - return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencyServices) -} - -// GetIndexRolloverFrequencySamplingDuration returns jaeger-sampling index rollover frequency duration -func (c *Configuration) GetIndexRolloverFrequencySamplingDuration() time.Duration { - return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySampling) -} - -// GetIndexRolloverFrequencyDuration returns the index rollover frequency duration for the given frequency string -func getIndexRolloverFrequencyDuration(frequency string) time.Duration { +// RolloverFrequencyAsNegativeDuration returns the index rollover frequency duration for the given frequency string +func RolloverFrequencyAsNegativeDuration(frequency string) time.Duration { if frequency == "hour" { return -1 * time.Hour } diff --git a/pkg/es/config/config_test.go b/pkg/es/config/config_test.go index 1c63cf93eed..0005301ee08 100644 --- a/pkg/es/config/config_test.go +++ b/pkg/es/config/config_test.go @@ -316,26 +316,40 @@ func TestNewClient(t *testing.T) { func TestApplyDefaults(t *testing.T) { source := &Configuration{ - RemoteReadClusters: []string{"cluster1", "cluster2"}, - Username: "sourceUser", - Password: "sourcePass", - Sniffer: true, - MaxSpanAge: 100, - AdaptiveSamplingLookback: 50, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - SnifferTLSEnabled: true, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + RemoteReadClusters: []string{"cluster1", "cluster2"}, + Username: "sourceUser", + Password: "sourcePass", + Sniffer: true, + MaxSpanAge: 100, + AdaptiveSamplingLookback: 50, + Indices: Indices{ + IndexPrefix: "hello", + Spans: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 10, + }, + Services: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 20, + }, + Dependencies: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 30, + }, + Sampling: IndexOptions{}, + }, + BulkSize: 1000, + BulkWorkers: 10, + BulkActions: 100, + BulkFlushInterval: 30, + SnifferTLSEnabled: true, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", } tests := []struct { @@ -346,66 +360,104 @@ func TestApplyDefaults(t *testing.T) { { name: "All Defaults Applied except PriorityDependenciesTemplate", target: &Configuration{ - PriorityDependenciesTemplate: 30, + Indices: Indices{ + Dependencies: IndexOptions{ + Priority: 30, + }, + }, }, // All fields are empty expected: source, }, { name: "Some Defaults Applied", target: &Configuration{ - RemoteReadClusters: []string{"customCluster"}, - Username: "customUser", - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, + RemoteReadClusters: []string{"customCluster"}, + Username: "customUser", + Indices: Indices{ + Spans: IndexOptions{ + Priority: 10, + }, + Services: IndexOptions{ + Priority: 20, + }, + Dependencies: IndexOptions{ + Priority: 30, + }, + }, // Other fields left default }, expected: &Configuration{ - RemoteReadClusters: []string{"customCluster"}, - Username: "customUser", - Password: "sourcePass", - Sniffer: true, - SnifferTLSEnabled: true, - MaxSpanAge: 100, - AdaptiveSamplingLookback: 50, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + RemoteReadClusters: []string{"customCluster"}, + Username: "customUser", + Password: "sourcePass", + Sniffer: true, + SnifferTLSEnabled: true, + MaxSpanAge: 100, + AdaptiveSamplingLookback: 50, + Indices: Indices{ + IndexPrefix: "hello", + Spans: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 10, + }, + Services: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 20, + }, + Dependencies: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 30, + }, + }, + BulkSize: 1000, + BulkWorkers: 10, + BulkActions: 100, + BulkFlushInterval: 30, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", }, }, { name: "No Defaults Applied", target: &Configuration{ - RemoteReadClusters: []string{"cluster1", "cluster2"}, - Username: "sourceUser", - Password: "sourcePass", - Sniffer: true, - MaxSpanAge: 100, - AdaptiveSamplingLookback: 50, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - SnifferTLSEnabled: true, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + RemoteReadClusters: []string{"cluster1", "cluster2"}, + Username: "sourceUser", + Password: "sourcePass", + Sniffer: true, + MaxSpanAge: 100, + AdaptiveSamplingLookback: 50, + Indices: Indices{ + IndexPrefix: "hello", + Spans: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 10, + }, + Services: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 20, + }, + Dependencies: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 30, + }, + }, + BulkSize: 1000, + BulkWorkers: 10, + BulkActions: 100, + BulkFlushInterval: 30, + SnifferTLSEnabled: true, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", }, expected: source, }, @@ -504,7 +556,7 @@ func TestTagKeysAsFields(t *testing.T) { } } -func TestGetIndexRolloverFrequencySpansDuration(t *testing.T) { +func TestRolloverFrequencyAsNegativeDuration(t *testing.T) { tests := []struct { name string indexFrequency string @@ -529,72 +581,7 @@ func TestGetIndexRolloverFrequencySpansDuration(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c := &Configuration{IndexRolloverFrequencySpans: test.indexFrequency} - got := c.GetIndexRolloverFrequencySpansDuration() - require.Equal(t, test.expected, got) - }) - } -} - -func TestGetIndexRolloverFrequencyServicesDuration(t *testing.T) { - tests := []struct { - name string - indexFrequency string - expected time.Duration - }{ - { - name: "hourly jaeger-service", - indexFrequency: "hour", - expected: -1 * time.Hour, - }, - { - name: "daily jaeger-service", - indexFrequency: "daily", - expected: -24 * time.Hour, - }, - { - name: "empty jaeger-service", - indexFrequency: "", - expected: -24 * time.Hour, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - c := &Configuration{IndexRolloverFrequencyServices: test.indexFrequency} - got := c.GetIndexRolloverFrequencyServicesDuration() - require.Equal(t, test.expected, got) - }) - } -} - -func TestGetIndexRolloverFrequencySamplingDuration(t *testing.T) { - tests := []struct { - name string - indexFrequency string - expected time.Duration - }{ - { - name: "hourly jaeger-sampling", - indexFrequency: "hour", - expected: -1 * time.Hour, - }, - { - name: "daily jaeger-sampling", - indexFrequency: "daily", - expected: -24 * time.Hour, - }, - { - name: "empty jaeger-sampling", - indexFrequency: "", - expected: -24 * time.Hour, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - c := &Configuration{IndexRolloverFrequencySampling: test.indexFrequency} - got := c.GetIndexRolloverFrequencySamplingDuration() + got := RolloverFrequencyAsNegativeDuration(test.indexFrequency) require.Equal(t, test.expected, got) }) } @@ -632,6 +619,47 @@ func TestValidate(t *testing.T) { } } +func TestApplyForIndexPrefix(t *testing.T) { + tests := []struct { + testName string + prefix IndexPrefix + name string + expectedName string + }{ + { + testName: "no prefix", + prefix: "", + name: "hello", + expectedName: "hello", + }, + { + testName: "empty name", + prefix: "bye", + name: "", + expectedName: "bye-", + }, + { + testName: "separator suffix", + prefix: "bye-", + name: "hello", + expectedName: "bye-hello", + }, + { + testName: "no separator suffix", + prefix: "bye", + name: "hello", + expectedName: "bye-hello", + }, + } + + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + got := test.prefix.Apply(test.name) + require.Equal(t, test.expectedName, got) + }) + } +} + func TestMain(m *testing.M) { testutils.VerifyGoLeaks(m) } diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 7bb0705cf54..ffceb95c98c 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -16,13 +16,13 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel" ) const ( - dependencyType = "dependencies" - dependencyIndex = "jaeger-dependencies-" - indexPrefixSeparator = "-" + dependencyType = "dependencies" + dependencyIndexBaseName = "jaeger-dependencies-" ) // DependencyStore handles all queries and insertions to ElasticSearch dependencies @@ -39,7 +39,7 @@ type DependencyStore struct { type Params struct { Client func() es.Client Logger *zap.Logger - IndexPrefix string + IndexPrefix config.IndexPrefix IndexDateLayout string MaxDocCount int UseReadWriteAliases bool @@ -50,20 +50,13 @@ func NewDependencyStore(p Params) *DependencyStore { return &DependencyStore{ client: p.Client, logger: p.Logger, - dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex), + dependencyIndexPrefix: p.IndexPrefix.Apply(dependencyIndexBaseName), indexDateLayout: p.IndexDateLayout, maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, } } -func prefixIndexName(prefix, index string) string { - if prefix != "" { - return prefix + indexPrefixSeparator + index - } - return index -} - // WriteDependencies implements dependencystore.Writer#WriteDependencies. func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { writeIndexName := s.getWriteIndex(ts) diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index c141965186b..8ffc021eb82 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -20,6 +20,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -34,7 +35,7 @@ type depStorageTest struct { storage *DependencyStore } -func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn func(r *depStorageTest)) { +func withDepStorage(indexPrefix config.IndexPrefix, indexDateLayout string, maxDocCount int, fn func(r *depStorageTest)) { client := &mocks.Client{} logger, logBuffer := testutils.NewLogger() r := &depStorageTest{ @@ -59,7 +60,7 @@ var ( func TestNewSpanReaderIndexPrefix(t *testing.T) { testCases := []struct { - prefix string + prefix config.IndexPrefix expected string }{ {prefix: "", expected: ""}, @@ -76,7 +77,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { MaxDocCount: defaultMaxDocCount, }) - assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix) + assert.Equal(t, testCase.expected+dependencyIndexBaseName, r.dependencyIndexPrefix) } } @@ -131,7 +132,7 @@ func TestGetDependencies(t *testing.T) { searchError error expectedError string expectedOutput []model.DependencyLink - indexPrefix string + indexPrefix config.IndexPrefix maxDocCount int indices []any }{ @@ -211,37 +212,37 @@ func TestGetReadIndices(t *testing.T) { params: Params{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true}, lookback: 23 * time.Hour, indices: []string{ - dependencyIndex + "read", + dependencyIndexBaseName + "read", }, }, { params: Params{IndexPrefix: "", IndexDateLayout: "2006-01-02"}, lookback: 23 * time.Hour, indices: []string{ - dependencyIndex + fixedTime.Format("2006-01-02"), - dependencyIndex + fixedTime.Add(-23*time.Hour).Format("2006-01-02"), + dependencyIndexBaseName + fixedTime.Format("2006-01-02"), + dependencyIndexBaseName + fixedTime.Add(-23*time.Hour).Format("2006-01-02"), }, }, { params: Params{IndexPrefix: "", IndexDateLayout: "2006-01-02"}, lookback: 13 * time.Hour, indices: []string{ - dependencyIndex + fixedTime.UTC().Format("2006-01-02"), - dependencyIndex + fixedTime.Add(-13*time.Hour).Format("2006-01-02"), + dependencyIndexBaseName + fixedTime.UTC().Format("2006-01-02"), + dependencyIndexBaseName + fixedTime.Add(-13*time.Hour).Format("2006-01-02"), }, }, { params: Params{IndexPrefix: "foo:", IndexDateLayout: "2006-01-02"}, lookback: 1 * time.Hour, indices: []string{ - "foo:" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"), + "foo:" + config.IndexPrefixSeparator + dependencyIndexBaseName + fixedTime.Format("2006-01-02"), }, }, { params: Params{IndexPrefix: "foo-", IndexDateLayout: "2006-01-02"}, lookback: 0, indices: []string{ - "foo-" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"), + "foo" + config.IndexPrefixSeparator + dependencyIndexBaseName + fixedTime.Format("2006-01-02"), }, }, } @@ -260,11 +261,11 @@ func TestGetWriteIndex(t *testing.T) { }{ { params: Params{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true}, - writeIndex: dependencyIndex + "write", + writeIndex: dependencyIndexBaseName + "write", }, { params: Params{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: false}, - writeIndex: dependencyIndex + fixedTime.Format("2006-01-02"), + writeIndex: dependencyIndexBaseName + fixedTime.Format("2006-01-02"), }, } for _, testCase := range testCases { diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 1a9874dc0c6..a403ccf5388 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -221,21 +221,19 @@ func createSpanReader( return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: clientFn, - MaxDocCount: cfg.MaxDocCount, - MaxSpanAge: cfg.MaxSpanAge, - IndexPrefix: cfg.IndexPrefix, - SpanIndexDateLayout: cfg.IndexDateLayoutSpans, - ServiceIndexDateLayout: cfg.IndexDateLayoutServices, - SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(), - ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(), - TagDotReplacement: cfg.Tags.DotReplacement, - UseReadWriteAliases: cfg.UseReadWriteAliases, - Archive: archive, - RemoteReadClusters: cfg.RemoteReadClusters, - Logger: logger, - MetricsFactory: mFactory, - Tracer: tp.Tracer("esSpanStore.SpanReader"), + Client: clientFn, + MaxDocCount: cfg.MaxDocCount, + MaxSpanAge: cfg.MaxSpanAge, + IndexPrefix: cfg.Indices.IndexPrefix, + SpanIndex: cfg.Indices.Spans, + ServiceIndex: cfg.Indices.Services, + TagDotReplacement: cfg.Tags.DotReplacement, + UseReadWriteAliases: cfg.UseReadWriteAliases, + Archive: archive, + RemoteReadClusters: cfg.RemoteReadClusters, + Logger: logger, + MetricsFactory: mFactory, + Tracer: tp.Tracer("esSpanStore.SpanReader"), }), nil } @@ -257,18 +255,18 @@ func createSpanWriter( } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ - Client: clientFn, - IndexPrefix: cfg.IndexPrefix, - SpanIndexDateLayout: cfg.IndexDateLayoutSpans, - ServiceIndexDateLayout: cfg.IndexDateLayoutServices, - AllTagsAsFields: cfg.Tags.AllAsFields, - TagKeysAsFields: tags, - TagDotReplacement: cfg.Tags.DotReplacement, - Archive: archive, - UseReadWriteAliases: cfg.UseReadWriteAliases, - Logger: logger, - MetricsFactory: mFactory, - ServiceCacheTTL: cfg.ServiceCacheTTL, + Client: clientFn, + IndexPrefix: cfg.Indices.IndexPrefix, + SpanIndex: cfg.Indices.Spans, + ServiceIndex: cfg.Indices.Services, + AllTagsAsFields: cfg.Tags.AllAsFields, + TagKeysAsFields: tags, + TagDotReplacement: cfg.Tags.DotReplacement, + Archive: archive, + UseReadWriteAliases: cfg.UseReadWriteAliases, + Logger: logger, + MetricsFactory: mFactory, + ServiceCacheTTL: cfg.ServiceCacheTTL, }) // Creating a template here would conflict with the one created for ILM resulting to no index rollover @@ -278,7 +276,7 @@ func createSpanWriter( if err != nil { return nil, err } - if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix); err != nil { + if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.Indices.IndexPrefix); err != nil { return nil, err } } @@ -289,9 +287,9 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store params := esSampleStore.Params{ Client: f.getPrimaryClient, Logger: f.logger, - IndexPrefix: f.primaryConfig.IndexPrefix, - IndexDateLayout: f.primaryConfig.IndexDateLayoutSampling, - IndexRolloverFrequency: f.primaryConfig.GetIndexRolloverFrequencySamplingDuration(), + IndexPrefix: f.primaryConfig.Indices.IndexPrefix, + IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout, + IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency), Lookback: f.primaryConfig.AdaptiveSamplingLookback, MaxDocCount: f.primaryConfig.MaxDocCount, } @@ -313,15 +311,10 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store func mappingBuilderFromConfig(cfg *config.Configuration) mappings.MappingBuilder { return mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: cfg.NumShards, - Replicas: cfg.NumReplicas, - EsVersion: cfg.Version, - IndexPrefix: cfg.IndexPrefix, - UseILM: cfg.UseILM, - PrioritySpanTemplate: cfg.PrioritySpanTemplate, - PriorityServiceTemplate: cfg.PriorityServiceTemplate, - PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate, + TemplateBuilder: es.TextTemplateBuilder{}, + Indices: cfg.Indices, + EsVersion: cfg.Version, + UseILM: cfg.UseILM, } } @@ -333,8 +326,8 @@ func createDependencyReader( reader := esDepStore.NewDependencyStore(esDepStore.Params{ Client: clientFn, Logger: logger, - IndexPrefix: cfg.IndexPrefix, - IndexDateLayout: cfg.IndexDateLayoutDependencies, + IndexPrefix: cfg.Indices.IndexPrefix, + IndexDateLayout: cfg.Indices.Dependencies.DateLayout, MaxDocCount: cfg.MaxDocCount, UseReadWriteAliases: cfg.UseReadWriteAliases, }) diff --git a/plugin/storage/es/mappings/jaeger-dependencies-8.json b/plugin/storage/es/mappings/jaeger-dependencies-8.json index 57767866284..e06d9826a13 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-8.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-8.json @@ -1,5 +1,5 @@ { - "priority": {{ .PriorityDependenciesTemplate }}, + "priority": {{ .Priority }}, "index_patterns": "{{ .IndexPrefix }}jaeger-dependencies-*", "template": { {{- if .UseILM }} diff --git a/plugin/storage/es/mappings/jaeger-sampling-8.json b/plugin/storage/es/mappings/jaeger-sampling-8.json index 0667520803a..d9adeb8bfa2 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-8.json +++ b/plugin/storage/es/mappings/jaeger-sampling-8.json @@ -1,5 +1,5 @@ { - "priority": {{ .PrioritySamplingTemplate }}, + "priority": {{ .Priority }}, "index_patterns": "{{ .IndexPrefix }}jaeger-sampling-*", "template": { {{- if .UseILM }} diff --git a/plugin/storage/es/mappings/jaeger-service-7.json b/plugin/storage/es/mappings/jaeger-service-7.json index 0ca2d186319..a0bf5c3f392 100644 --- a/plugin/storage/es/mappings/jaeger-service-7.json +++ b/plugin/storage/es/mappings/jaeger-service-7.json @@ -7,7 +7,7 @@ {{- end }} "settings":{ "index.number_of_shards": {{ .Shards }}, - "index.number_of_replicas": {{ .Replicas }}, + "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true {{- if .UseILM }} diff --git a/plugin/storage/es/mappings/jaeger-service-8.json b/plugin/storage/es/mappings/jaeger-service-8.json index 97ab02f573d..33039fb6be2 100644 --- a/plugin/storage/es/mappings/jaeger-service-8.json +++ b/plugin/storage/es/mappings/jaeger-service-8.json @@ -1,5 +1,5 @@ { - "priority": {{ .PriorityServiceTemplate}}, + "priority": {{ .Priority }}, "index_patterns": "{{ .IndexPrefix }}jaeger-service-*", "template": { {{- if .UseILM }} @@ -9,7 +9,7 @@ {{- end }} "settings": { "index.number_of_shards": {{ .Shards }}, - "index.number_of_replicas": {{ .Replicas }}, + "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true {{- if .UseILM }}, diff --git a/plugin/storage/es/mappings/jaeger-span-8.json b/plugin/storage/es/mappings/jaeger-span-8.json index d91fadba619..b9ee8a76b8f 100644 --- a/plugin/storage/es/mappings/jaeger-span-8.json +++ b/plugin/storage/es/mappings/jaeger-span-8.json @@ -1,5 +1,5 @@ { - "priority": {{ .PrioritySpanTemplate}}, + "priority": {{ .Priority }}, "index_patterns": "{{ .IndexPrefix }}jaeger-span-*", "template": { diff --git a/plugin/storage/es/mappings/mapping.go b/plugin/storage/es/mappings/mapping.go index 1962f85100d..e1f6786aeff 100644 --- a/plugin/storage/es/mappings/mapping.go +++ b/plugin/storage/es/mappings/mapping.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" ) // MAPPINGS contains embedded index templates. @@ -16,29 +17,61 @@ import ( //go:embed *.json var MAPPINGS embed.FS -// MappingBuilder holds parameters required to render an elasticsearch index template +// MappingBuilder holds common parameters required to render an elasticsearch index template type MappingBuilder struct { - TemplateBuilder es.TemplateBuilder - Shards int64 - Replicas int64 - PrioritySpanTemplate int64 - PriorityServiceTemplate int64 - PriorityDependenciesTemplate int64 - PrioritySamplingTemplate int64 - EsVersion uint - IndexPrefix string - UseILM bool - ILMPolicyName string + TemplateBuilder es.TemplateBuilder + Indices config.Indices + EsVersion uint + UseILM bool + ILMPolicyName string +} + +// templateParams holds parameters required to render an elasticsearch index template +type templateParams struct { + UseILM bool + ILMPolicyName string + IndexPrefix string + Shards int64 + Replicas int64 + Priority int64 +} + +func (mb MappingBuilder) getMappingTemplateOptions(mapping string) templateParams { + mappingOpts := templateParams{} + mappingOpts.UseILM = mb.UseILM + mappingOpts.ILMPolicyName = mb.ILMPolicyName + + switch { + case strings.Contains(mapping, "span"): + mappingOpts.Shards = mb.Indices.Spans.Shards + mappingOpts.Replicas = mb.Indices.Spans.Replicas + mappingOpts.Priority = mb.Indices.Spans.Priority + case strings.Contains(mapping, "service"): + mappingOpts.Shards = mb.Indices.Services.Shards + mappingOpts.Replicas = mb.Indices.Services.Replicas + mappingOpts.Priority = mb.Indices.Services.Priority + case strings.Contains(mapping, "dependencies"): + mappingOpts.Shards = mb.Indices.Dependencies.Shards + mappingOpts.Replicas = mb.Indices.Dependencies.Replicas + mappingOpts.Priority = mb.Indices.Dependencies.Priority + case strings.Contains(mapping, "sampling"): + mappingOpts.Shards = mb.Indices.Sampling.Shards + mappingOpts.Replicas = mb.Indices.Sampling.Replicas + mappingOpts.Priority = mb.Indices.Sampling.Priority + } + + return mappingOpts } // GetMapping returns the rendered mapping based on elasticsearch version func (mb *MappingBuilder) GetMapping(mapping string) (string, error) { + templateOpts := mb.getMappingTemplateOptions(mapping) if mb.EsVersion == 8 { - return mb.fixMapping(mapping + "-8.json") + return mb.renderMapping(mapping+"-8.json", templateOpts) } else if mb.EsVersion == 7 { - return mb.fixMapping(mapping + "-7.json") + return mb.renderMapping(mapping+"-7.json", templateOpts) } - return mb.fixMapping(mapping + "-6.json") + return mb.renderMapping(mapping+"-6.json", templateOpts) } // GetSpanServiceMappings returns span and service mappings @@ -69,17 +102,15 @@ func loadMapping(name string) string { return string(s) } -func (mb *MappingBuilder) fixMapping(mapping string) (string, error) { +func (mb *MappingBuilder) renderMapping(mapping string, options templateParams) (string, error) { tmpl, err := mb.TemplateBuilder.Parse(loadMapping(mapping)) if err != nil { return "", err } writer := new(bytes.Buffer) - if mb.IndexPrefix != "" && !strings.HasSuffix(mb.IndexPrefix, "-") { - mb.IndexPrefix += "-" - } - if err := tmpl.Execute(writer, mb); err != nil { + options.IndexPrefix = mb.Indices.IndexPrefix.Apply("") + if err := tmpl.Execute(writer, options); err != nil { return "", err } diff --git a/plugin/storage/es/mappings/mapping_test.go b/plugin/storage/es/mappings/mapping_test.go index 194a15b2c24..690094e1d13 100644 --- a/plugin/storage/es/mappings/mapping_test.go +++ b/plugin/storage/es/mappings/mapping_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" ) @@ -46,17 +47,29 @@ func TestMappingBuilderGetMapping(t *testing.T) { } for _, tt := range tests { t.Run(tt.mapping, func(t *testing.T) { + defaultOpts := func(p int64) config.IndexOptions { + return config.IndexOptions{ + Shards: 3, + Replicas: 3, + Priority: p, + } + } + serviceOps := defaultOpts(501) + dependenciesOps := defaultOpts(502) + samplingOps := defaultOpts(503) + mb := &MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: 3, - Replicas: 3, - PrioritySpanTemplate: 500, - PriorityServiceTemplate: 501, - PriorityDependenciesTemplate: 502, - EsVersion: tt.esVersion, - IndexPrefix: "test-", - UseILM: true, - ILMPolicyName: "jaeger-test-policy", + TemplateBuilder: es.TextTemplateBuilder{}, + Indices: config.Indices{ + IndexPrefix: "test-", + Spans: defaultOpts(500), + Services: serviceOps, + Dependencies: dependenciesOps, + Sampling: samplingOps, + }, + EsVersion: tt.esVersion, + UseILM: true, + ILMPolicyName: "jaeger-test-policy", } got, err := mb.GetMapping(tt.mapping) require.NoError(t, err) @@ -137,16 +150,24 @@ func TestMappingBuilderFixMapping(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + indexTemOps := config.IndexOptions{ + Shards: 3, + Replicas: 5, + Priority: 500, + } mappingBuilder := MappingBuilder{ TemplateBuilder: test.templateBuilderMockFunc(), - Shards: 3, - Replicas: 5, - EsVersion: 7, - IndexPrefix: "test", - UseILM: true, - ILMPolicyName: "jaeger-test-policy", + Indices: config.Indices{ + Spans: indexTemOps, + Services: indexTemOps, + Dependencies: indexTemOps, + Sampling: indexTemOps, + }, + EsVersion: 7, + UseILM: true, + ILMPolicyName: "jaeger-test-policy", } - _, err := mappingBuilder.fixMapping("test") + _, err := mappingBuilder.renderMapping("test", mappingBuilder.getMappingTemplateOptions("test")) if test.err != "" { require.EqualError(t, err, test.err) } else { @@ -158,8 +179,6 @@ func TestMappingBuilderFixMapping(t *testing.T) { func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { type args struct { - shards int64 - replicas int64 esVersion uint indexPrefix string useILM bool @@ -174,8 +193,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version 7", args: args{ - shards: 3, - replicas: 3, esVersion: 7, indexPrefix: "test", useILM: true, @@ -193,8 +210,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version 7 Service Error", args: args{ - shards: 3, - replicas: 3, esVersion: 7, indexPrefix: "test", useILM: true, @@ -214,8 +229,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version < 7", args: args{ - shards: 3, - replicas: 3, esVersion: 6, indexPrefix: "test", useILM: true, @@ -233,8 +246,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version < 7 Service Error", args: args{ - shards: 3, - replicas: 3, esVersion: 6, indexPrefix: "test", useILM: true, @@ -253,8 +264,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version < 7 Span Error", args: args{ - shards: 3, - replicas: 3, esVersion: 6, indexPrefix: "test", useILM: true, @@ -272,8 +281,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version 7 Span Error", args: args{ - shards: 3, - replicas: 3, esVersion: 7, indexPrefix: "test", useILM: true, @@ -291,14 +298,22 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + indexTemOps := config.IndexOptions{ + Shards: 3, + Replicas: 3, + } + mappingBuilder := MappingBuilder{ TemplateBuilder: test.mockNewTextTemplateBuilder(), - Shards: test.args.shards, - Replicas: test.args.replicas, - EsVersion: test.args.esVersion, - IndexPrefix: test.args.indexPrefix, - UseILM: test.args.useILM, - ILMPolicyName: test.args.ilmPolicyName, + Indices: config.Indices{ + Spans: indexTemOps, + Services: indexTemOps, + Dependencies: indexTemOps, + Sampling: indexTemOps, + }, + EsVersion: test.args.esVersion, + UseILM: test.args.useILM, + ILMPolicyName: test.args.ilmPolicyName, } _, _, err := mappingBuilder.GetSpanServiceMappings() if test.err != "" { diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 23ae110803e..03f40b027a4 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -67,8 +67,17 @@ const ( defaultIndexRolloverFrequency = "day" defaultSendGetBodyAs = "" + defaultIndexPrefix = "" ) +var defaultIndexOptions = config.IndexOptions{ + DateLayout: initDateLayout(defaultIndexRolloverFrequency, defaultIndexDateSeparator), + RolloverFrequency: defaultIndexRolloverFrequency, + Shards: 5, + Replicas: 1, + Priority: 0, +} + // TODO this should be moved next to config.Configuration struct (maybe ./flags package) // Options contains various type of Elasticsearch configs and provides the ability @@ -159,7 +168,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "Timeout used for queries. A Timeout of zero means no timeout") flagSet.Int64( nsConfig.namespace+suffixNumShards, - nsConfig.NumShards, + nsConfig.Indices.Spans.Shards, "The number of shards per index in Elasticsearch") flagSet.Duration( nsConfig.namespace+suffixServiceCacheTTL, @@ -168,19 +177,19 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { ) flagSet.Int64( nsConfig.namespace+suffixNumReplicas, - nsConfig.NumReplicas, + nsConfig.Indices.Spans.Replicas, "The number of replicas per index in Elasticsearch") flagSet.Int64( nsConfig.namespace+suffixPrioritySpanTemplate, - nsConfig.PrioritySpanTemplate, + nsConfig.Indices.Spans.Priority, "Priority of jaeger-span index template (ESv8 only)") flagSet.Int64( nsConfig.namespace+suffixPriorityServiceTemplate, - nsConfig.PriorityServiceTemplate, + nsConfig.Indices.Services.Priority, "Priority of jaeger-service index template (ESv8 only)") flagSet.Int64( nsConfig.namespace+suffixPriorityDependenciesTemplate, - nsConfig.PriorityDependenciesTemplate, + nsConfig.Indices.Dependencies.Priority, "Priority of jaeger-dependecies index template (ESv8 only)") flagSet.Int( nsConfig.namespace+suffixBulkSize, @@ -200,7 +209,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "A time.Duration after which bulk requests are committed, regardless of other thresholds. Set to zero to disable. By default, this is disabled.") flagSet.String( nsConfig.namespace+suffixIndexPrefix, - nsConfig.IndexPrefix, + string(nsConfig.Indices.IndexPrefix), "Optional prefix of Jaeger indices. For example \"production\" creates \"production-jaeger-*\".") flagSet.String( nsConfig.namespace+suffixIndexDateSeparator, @@ -311,18 +320,32 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) cfg.AdaptiveSamplingLookback = v.GetDuration(cfg.namespace + suffixAdaptiveSamplingLookback) - cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) - cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) - cfg.PrioritySpanTemplate = v.GetInt64(cfg.namespace + suffixPrioritySpanTemplate) - cfg.PriorityServiceTemplate = v.GetInt64(cfg.namespace + suffixPriorityServiceTemplate) - cfg.PriorityDependenciesTemplate = v.GetInt64(cfg.namespace + suffixPriorityDependenciesTemplate) + + cfg.Indices.Spans.Shards = v.GetInt64(cfg.namespace + suffixNumShards) + cfg.Indices.Services.Shards = v.GetInt64(cfg.namespace + suffixNumShards) + cfg.Indices.Sampling.Shards = v.GetInt64(cfg.namespace + suffixNumShards) + cfg.Indices.Dependencies.Shards = v.GetInt64(cfg.namespace + suffixNumShards) + + cfg.Indices.Spans.Replicas = v.GetInt64(cfg.namespace + suffixNumReplicas) + cfg.Indices.Services.Replicas = v.GetInt64(cfg.namespace + suffixNumReplicas) + cfg.Indices.Sampling.Replicas = v.GetInt64(cfg.namespace + suffixNumReplicas) + cfg.Indices.Dependencies.Replicas = v.GetInt64(cfg.namespace + suffixNumReplicas) + + cfg.Indices.Spans.Priority = v.GetInt64(cfg.namespace + suffixPrioritySpanTemplate) + cfg.Indices.Services.Priority = v.GetInt64(cfg.namespace + suffixPriorityServiceTemplate) + // cfg.Indices.Sampling does not have a separate flag + cfg.Indices.Dependencies.Priority = v.GetInt64(cfg.namespace + suffixPriorityDependenciesTemplate) + cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers) cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions) cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.ServiceCacheTTL = v.GetDuration(cfg.namespace + suffixServiceCacheTTL) - cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) + indexPrefix := v.GetString(cfg.namespace + suffixIndexPrefix) + + cfg.Indices.IndexPrefix = config.IndexPrefix(indexPrefix) + cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude) cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile) @@ -345,17 +368,17 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",") } - cfg.IndexRolloverFrequencySpans = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans)) - cfg.IndexRolloverFrequencyServices = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices)) - cfg.IndexRolloverFrequencySampling = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySampling)) + cfg.Indices.Spans.RolloverFrequency = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans)) + cfg.Indices.Services.RolloverFrequency = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices)) + cfg.Indices.Sampling.RolloverFrequency = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySampling)) separator := v.GetString(cfg.namespace + suffixIndexDateSeparator) - cfg.IndexDateLayoutSpans = initDateLayout(cfg.IndexRolloverFrequencySpans, separator) - cfg.IndexDateLayoutServices = initDateLayout(cfg.IndexRolloverFrequencyServices, separator) - cfg.IndexDateLayoutSampling = initDateLayout(cfg.IndexRolloverFrequencySampling, separator) + cfg.Indices.Spans.DateLayout = initDateLayout(cfg.Indices.Spans.RolloverFrequency, separator) + cfg.Indices.Services.DateLayout = initDateLayout(cfg.Indices.Services.RolloverFrequency, separator) + cfg.Indices.Sampling.DateLayout = initDateLayout(cfg.Indices.Sampling.RolloverFrequency, separator) - // Dependencies calculation should be daily, and this index size is very small - cfg.IndexDateLayoutDependencies = initDateLayout(defaultIndexRolloverFrequency, separator) + // Daily is recommended for dependencies calculation, and this index size is very small + cfg.Indices.Dependencies.DateLayout = initDateLayout(cfg.Indices.Dependencies.DateLayout, separator) var err error cfg.TLS, err = cfg.getTLSFlagsConfig().InitFromViper(v) if err != nil { @@ -399,20 +422,15 @@ func initDateLayout(rolloverFreq, sep string) string { func DefaultConfig() config.Configuration { return config.Configuration{ - Username: "", - Password: "", - Sniffer: false, - MaxSpanAge: 72 * time.Hour, - AdaptiveSamplingLookback: 72 * time.Hour, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 0, - PriorityServiceTemplate: 0, - PriorityDependenciesTemplate: 0, - BulkSize: 5 * 1000 * 1000, - BulkWorkers: 1, - BulkActions: 1000, - BulkFlushInterval: time.Millisecond * 200, + Username: "", + Password: "", + Sniffer: false, + MaxSpanAge: 72 * time.Hour, + AdaptiveSamplingLookback: 72 * time.Hour, + BulkSize: 5 * 1000 * 1000, + BulkWorkers: 1, + BulkActions: 1000, + BulkFlushInterval: time.Millisecond * 200, Tags: config.TagsAsFields{ DotReplacement: "@", }, @@ -426,5 +444,11 @@ func DefaultConfig() config.Configuration { MaxDocCount: defaultMaxDocCount, LogLevel: "error", SendGetBodyAs: defaultSendGetBodyAs, + Indices: config.Indices{ + Spans: defaultIndexOptions, + Services: defaultIndexOptions, + Dependencies: defaultIndexOptions, + Sampling: defaultIndexOptions, + }, } } diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 8e08c74a6f1..8b254491667 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" + escfg "github.com/jaegertracing/jaeger/pkg/es/config" ) func TestOptions(t *testing.T) { @@ -22,8 +23,14 @@ func TestOptions(t *testing.T) { assert.Empty(t, primary.PasswordFilePath) assert.NotEmpty(t, primary.Servers) assert.Empty(t, primary.RemoteReadClusters) - assert.Equal(t, int64(5), primary.NumShards) - assert.Equal(t, int64(1), primary.NumReplicas) + assert.EqualValues(t, 5, primary.Indices.Spans.Shards) + assert.EqualValues(t, 5, primary.Indices.Services.Shards) + assert.EqualValues(t, 5, primary.Indices.Sampling.Shards) + assert.EqualValues(t, 5, primary.Indices.Dependencies.Shards) + assert.EqualValues(t, 1, primary.Indices.Spans.Replicas) + assert.EqualValues(t, 1, primary.Indices.Services.Replicas) + assert.EqualValues(t, 1, primary.Indices.Sampling.Replicas) + assert.EqualValues(t, 1, primary.Indices.Dependencies.Replicas) assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) assert.False(t, primary.Sniffer) assert.False(t, primary.SnifferTLSEnabled) @@ -87,22 +94,28 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", primary.Tags.DotReplacement) assert.Equal(t, "./file.txt", primary.Tags.File) assert.Equal(t, "test,tags", primary.Tags.Include) - assert.Equal(t, "20060102", primary.IndexDateLayoutServices) - assert.Equal(t, "2006010215", primary.IndexDateLayoutSpans) + assert.Equal(t, "20060102", primary.Indices.Services.DateLayout) + assert.Equal(t, "2006010215", primary.Indices.Spans.DateLayout) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) assert.Equal(t, "hello", aux.Username) assert.Equal(t, "world", aux.Password) - assert.Equal(t, int64(5), aux.NumShards) - assert.Equal(t, int64(10), aux.NumReplicas) + assert.EqualValues(t, 5, aux.Indices.Spans.Shards) + assert.EqualValues(t, 5, aux.Indices.Services.Shards) + assert.EqualValues(t, 5, aux.Indices.Sampling.Shards) + assert.EqualValues(t, 5, aux.Indices.Dependencies.Shards) + assert.EqualValues(t, 10, aux.Indices.Spans.Replicas) + assert.EqualValues(t, 10, aux.Indices.Services.Replicas) + assert.EqualValues(t, 10, aux.Indices.Sampling.Replicas) + assert.EqualValues(t, 10, aux.Indices.Dependencies.Replicas) assert.Equal(t, 24*time.Hour, aux.MaxSpanAge) assert.True(t, aux.Sniffer) assert.True(t, aux.Tags.AllAsFields) assert.Equal(t, "@", aux.Tags.DotReplacement) assert.Equal(t, "./file.txt", aux.Tags.File) assert.Equal(t, "test,tags", aux.Tags.Include) - assert.Equal(t, "2006.01.02", aux.IndexDateLayoutServices) - assert.Equal(t, "2006.01.02.15", aux.IndexDateLayoutSpans) + assert.Equal(t, "2006.01.02", aux.Indices.Services.DateLayout) + assert.Equal(t, "2006.01.02.15", aux.Indices.Spans.DateLayout) assert.True(t, primary.UseILM) assert.Equal(t, "POST", aux.SendGetBodyAs) } @@ -171,7 +184,7 @@ func TestIndexDateSeparator(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayoutSpans) + assert.Equal(t, tc.wantDateLayout, primary.Indices.Spans.DateLayout) }) } } @@ -225,10 +238,10 @@ func TestIndexRollover(t *testing.T) { command.ParseFlags(tc.flags) opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, tc.wantSpanDateLayout, primary.IndexDateLayoutSpans) - assert.Equal(t, tc.wantServiceDateLayout, primary.IndexDateLayoutServices) - assert.Equal(t, tc.wantSpanIndexRolloverFrequency, primary.GetIndexRolloverFrequencySpansDuration()) - assert.Equal(t, tc.wantServiceIndexRolloverFrequency, primary.GetIndexRolloverFrequencyServicesDuration()) + assert.Equal(t, tc.wantSpanDateLayout, primary.Indices.Spans.DateLayout) + assert.Equal(t, tc.wantServiceDateLayout, primary.Indices.Services.DateLayout) + assert.Equal(t, tc.wantSpanIndexRolloverFrequency, escfg.RolloverFrequencyAsNegativeDuration(primary.Indices.Spans.RolloverFrequency)) + assert.Equal(t, tc.wantServiceIndexRolloverFrequency, escfg.RolloverFrequencyAsNegativeDuration(primary.Indices.Services.RolloverFrequency)) }) } } diff --git a/plugin/storage/es/samplingstore/storage.go b/plugin/storage/es/samplingstore/storage.go index 83ca15b97a3..2450e80eb37 100644 --- a/plugin/storage/es/samplingstore/storage.go +++ b/plugin/storage/es/samplingstore/storage.go @@ -14,14 +14,14 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore/dbmodel" ) const ( - samplingIndex = "jaeger-sampling" - throughputType = "throughput-sampling" - probabilitiesType = "probabilities-sampling" - indexPrefixSeparator = "-" + samplingIndexBaseName = "jaeger-sampling" + throughputType = "throughput-sampling" + probabilitiesType = "probabilities-sampling" ) type SamplingStore struct { @@ -37,7 +37,7 @@ type SamplingStore struct { type Params struct { Client func() es.Client Logger *zap.Logger - IndexPrefix string + IndexPrefix config.IndexPrefix IndexDateLayout string IndexRolloverFrequency time.Duration Lookback time.Duration @@ -48,7 +48,7 @@ func NewSamplingStore(p Params) *SamplingStore { return &SamplingStore{ client: p.Client, logger: p.Logger, - samplingIndexPrefix: p.PrefixedIndexName() + indexPrefixSeparator, + samplingIndexPrefix: p.PrefixedIndexName() + config.IndexPrefixSeparator, indexDateLayout: p.IndexDateLayout, maxDocCount: p.MaxDocCount, indexRolloverFrequency: p.IndexRolloverFrequency, @@ -185,10 +185,7 @@ func getReadIndices(indexName, indexDateLayout string, startTime time.Time, endT } func (p *Params) PrefixedIndexName() string { - if p.IndexPrefix != "" { - return p.IndexPrefix + indexPrefixSeparator + samplingIndex - } - return samplingIndex + return p.IndexPrefix.Apply(samplingIndexBaseName) } func buildTSQuery(start, end time.Time) elastic.Query { diff --git a/plugin/storage/es/samplingstore/storage_test.go b/plugin/storage/es/samplingstore/storage_test.go index cffe63c03d2..0d2764bb2cd 100644 --- a/plugin/storage/es/samplingstore/storage_test.go +++ b/plugin/storage/es/samplingstore/storage_test.go @@ -18,6 +18,7 @@ import ( samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore/dbmodel" @@ -32,7 +33,7 @@ type samplingStorageTest struct { storage *SamplingStore } -func withEsSampling(indexPrefix, indexDateLayout string, maxDocCount int, fn func(w *samplingStorageTest)) { +func withEsSampling(indexPrefix config.IndexPrefix, indexDateLayout string, maxDocCount int, fn func(w *samplingStorageTest)) { client := &mocks.Client{} logger, logBuffer := testutils.NewLogger() w := &samplingStorageTest{ @@ -53,7 +54,7 @@ func withEsSampling(indexPrefix, indexDateLayout string, maxDocCount int, fn fun func TestNewIndexPrefix(t *testing.T) { tests := []struct { name string - prefix string + prefix config.IndexPrefix expected string }{ { @@ -77,7 +78,7 @@ func TestNewIndexPrefix(t *testing.T) { IndexDateLayout: "2006-01-02", MaxDocCount: defaultMaxDocCount, }) - assert.Equal(t, test.expected+samplingIndex+indexPrefixSeparator, r.samplingIndexPrefix) + assert.Equal(t, test.expected+samplingIndexBaseName+config.IndexPrefixSeparator, r.samplingIndexPrefix) }) } } @@ -242,7 +243,7 @@ func TestGetThroughput(t *testing.T) { searchError error expectedError string expectedOutput []*samplemodel.Throughput - indexPrefix string + indexPrefix config.IndexPrefix maxDocCount int index string }{ @@ -306,7 +307,7 @@ func TestGetThroughput(t *testing.T) { if test.indexPrefix != "" { test.indexPrefix += "-" } - index := test.indexPrefix + test.index + index := test.indexPrefix.Apply(test.index) w.client.On("Search", index).Return(searchService) searchService.On("Size", mock.Anything).Return(searchService) searchService.On("Query", mock.Anything).Return(searchService) @@ -350,7 +351,7 @@ func TestGetLatestProbabilities(t *testing.T) { index string indexPresent bool indexError error - indexPrefix string + indexPrefix config.IndexPrefix }{ { name: "good probabilities without prefix", @@ -402,10 +403,7 @@ func TestGetLatestProbabilities(t *testing.T) { t.Run(test.name, func(t *testing.T) { withEsSampling(test.indexPrefix, "2006-01-02", defaultMaxDocCount, func(w *samplingStorageTest) { searchService := &mocks.SearchService{} - if test.indexPrefix != "" { - test.indexPrefix += "-" - } - index := test.indexPrefix + test.index + index := test.indexPrefix.Apply(test.index) w.client.On("Search", index).Return(searchService) searchService.On("Size", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", true).Return(searchService) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 2d856c19614..c9f2e07c830 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -20,14 +20,15 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( - spanIndex = "jaeger-span-" - serviceIndex = "jaeger-service-" + spanIndexBaseName = "jaeger-span-" + serviceIndexBaseName = "jaeger-service-" archiveIndexSuffix = "archive" archiveReadIndexSuffix = archiveIndexSuffix + "-read" archiveWriteIndexSuffix = archiveIndexSuffix + "-write" @@ -84,40 +85,36 @@ type SpanReader struct { client func() es.Client // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. - maxSpanAge time.Duration - serviceOperationStorage *ServiceOperationStorage - spanIndexPrefix string - serviceIndexPrefix string - spanIndexDateLayout string - serviceIndexDateLayout string - spanIndexRolloverFrequency time.Duration - serviceIndexRolloverFrequency time.Duration - spanConverter dbmodel.ToDomain - timeRangeIndices timeRangeIndexFn - sourceFn sourceFn - maxDocCount int - useReadWriteAliases bool - logger *zap.Logger - tracer trace.Tracer + maxSpanAge time.Duration + serviceOperationStorage *ServiceOperationStorage + spanIndexPrefix string + serviceIndexPrefix string + spanIndex cfg.IndexOptions + serviceIndex cfg.IndexOptions + spanConverter dbmodel.ToDomain + timeRangeIndices timeRangeIndexFn + sourceFn sourceFn + maxDocCount int + useReadWriteAliases bool + logger *zap.Logger + tracer trace.Tracer } // SpanReaderParams holds constructor params for NewSpanReader type SpanReaderParams struct { - Client func() es.Client - MaxSpanAge time.Duration - MaxDocCount int - IndexPrefix string - SpanIndexDateLayout string - ServiceIndexDateLayout string - SpanIndexRolloverFrequency time.Duration - ServiceIndexRolloverFrequency time.Duration - TagDotReplacement string - Archive bool - UseReadWriteAliases bool - RemoteReadClusters []string - MetricsFactory metrics.Factory - Logger *zap.Logger - Tracer trace.Tracer + Client func() es.Client + MaxSpanAge time.Duration + MaxDocCount int + IndexPrefix cfg.IndexPrefix + SpanIndex cfg.IndexOptions + ServiceIndex cfg.IndexOptions + TagDotReplacement string + Archive bool + UseReadWriteAliases bool + RemoteReadClusters []string + MetricsFactory metrics.Factory + Logger *zap.Logger + Tracer trace.Tracer } // NewSpanReader returns a new SpanReader with a metrics. @@ -128,23 +125,22 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if p.UseReadWriteAliases { maxSpanAge = rolloverMaxSpanAge } + return &SpanReader{ - client: p.Client, - maxSpanAge: maxSpanAge, - serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics - spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), - serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), - spanIndexDateLayout: p.SpanIndexDateLayout, - serviceIndexDateLayout: p.ServiceIndexDateLayout, - spanIndexRolloverFrequency: p.SpanIndexRolloverFrequency, - serviceIndexRolloverFrequency: p.SpanIndexRolloverFrequency, - spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), - timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), - sourceFn: getSourceFn(p.Archive, p.MaxDocCount), - maxDocCount: p.MaxDocCount, - useReadWriteAliases: p.UseReadWriteAliases, - logger: p.Logger, - tracer: p.Tracer, + client: p.Client, + maxSpanAge: maxSpanAge, + serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics + spanIndexPrefix: p.IndexPrefix.Apply(spanIndexBaseName), + serviceIndexPrefix: p.IndexPrefix.Apply(serviceIndexBaseName), + spanIndex: p.SpanIndex, + serviceIndex: p.ServiceIndex, + spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), + timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), + sourceFn: getSourceFn(p.Archive, p.MaxDocCount), + maxDocCount: p.MaxDocCount, + useReadWriteAliases: p.UseReadWriteAliases, + logger: p.Logger, + tracer: p.Tracer, } } @@ -221,13 +217,6 @@ func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, en return indices } -func indexNames(prefix, index string) string { - if prefix != "" { - return prefix + indexPrefixSeparator + index - } - return index -} - // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { ctx, span := s.tracer.Start(ctx, "GetTrace") @@ -278,7 +267,13 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { ctx, span := s.tracer.Start(ctx, "GetService") defer span.End() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) + jaegerIndices := s.timeRangeIndices( + s.serviceIndexPrefix, + s.serviceIndex.DateLayout, + currentTime.Add(-s.maxSpanAge), + currentTime, + cfg.RolloverFrequencyAsNegativeDuration(s.serviceIndex.RolloverFrequency), + ) return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) } @@ -290,7 +285,13 @@ func (s *SpanReader) GetOperations( ctx, span := s.tracer.Start(ctx, "GetOperations") defer span.End() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) + jaegerIndices := s.timeRangeIndices( + s.serviceIndexPrefix, + s.serviceIndex.DateLayout, + currentTime.Add(-s.maxSpanAge), + currentTime, + cfg.RolloverFrequencyAsNegativeDuration(s.serviceIndex.RolloverFrequency), + ) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) if err != nil { return nil, err @@ -369,7 +370,13 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. - indices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour), s.spanIndexRolloverFrequency) + indices := s.timeRangeIndices( + s.spanIndexPrefix, + s.spanIndex.DateLayout, + startTime.Add(-time.Hour), + endTime.Add(time.Hour), + cfg.RolloverFrequencyAsNegativeDuration(s.spanIndex.RolloverFrequency), + ) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) @@ -561,7 +568,13 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra // } aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces) boolQuery := s.buildFindTraceIDsQuery(traceQuery) - jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax, s.spanIndexRolloverFrequency) + jaegerIndices := s.timeRangeIndices( + s.spanIndexPrefix, + s.spanIndex.DateLayout, + traceQuery.StartTimeMin, + traceQuery.StartTimeMax, + cfg.RolloverFrequencyAsNegativeDuration(s.spanIndex.RolloverFrequency), + ) searchService := s.client().Search(jaegerIndices...). Size(0). // set to 0 because we don't want actual documents. diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index c96c7858d8b..7d4b2881d49 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -110,7 +111,6 @@ func withSpanReader(t *testing.T, fn func(r *spanReaderTest)) { Logger: zap.NewNop(), Tracer: tracer.Tracer("test"), MaxSpanAge: 0, - IndexPrefix: "", TagDotReplacement: "@", MaxDocCount: defaultMaxDocCount, }), @@ -133,7 +133,6 @@ func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTe Logger: zap.NewNop(), Tracer: tracer.Tracer("test"), MaxSpanAge: 0, - IndexPrefix: "", TagDotReplacement: "@", Archive: true, UseReadWriteAliases: readAlias, @@ -179,111 +178,124 @@ func TestSpanReaderIndices(t *testing.T) { client := &mocks.Client{} clientFn := func() es.Client { return client } date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) + spanDataLayout := "2006-01-02-15" serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) + metricsFactory := metricstest.NewFactory(0) logger, _ := testutils.NewLogger() tracer, _, closer := tracerProvider(t) defer closer() + spanIndexOpts := config.IndexOptions{DateLayout: spanDataLayout} + serviceIndexOpts := config.IndexOptions{DateLayout: serviceDataLayout} + testCases := []struct { indices []string params SpanReaderParams }{ { params: SpanReaderParams{ - IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, + Archive: false, + SpanIndex: spanIndexOpts, + ServiceIndex: serviceIndexOpts, }, - indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, + indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - IndexPrefix: "", UseReadWriteAliases: true, + UseReadWriteAliases: true, }, - indices: []string{spanIndex + "read", serviceIndex + "read"}, + indices: []string{spanIndexBaseName + "read", serviceIndexBaseName + "read"}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, + Archive: false, + SpanIndex: spanIndexOpts, + ServiceIndex: serviceIndexOpts, + IndexPrefix: "foo:", }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + spanDataLayoutFormat, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", UseReadWriteAliases: true, }, - indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}, + indices: []string{"foo:-" + spanIndexBaseName + "read", "foo:-" + serviceIndexBaseName + "read"}, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: true, + Archive: true, }, - indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}, + indices: []string{spanIndexBaseName + archiveIndexSuffix, serviceIndexBaseName + archiveIndexSuffix}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", Archive: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + archiveIndexSuffix}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveReadIndexSuffix, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + archiveReadIndexSuffix}, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, + SpanIndex: spanIndexOpts, + ServiceIndex: serviceIndexOpts, + Archive: false, + RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + spanDataLayoutFormat, - "cluster_one:" + spanIndex + spanDataLayoutFormat, - "cluster_two:" + spanIndex + spanDataLayoutFormat, - serviceIndex + serviceDataLayoutFormat, - "cluster_one:" + serviceIndex + serviceDataLayoutFormat, - "cluster_two:" + serviceIndex + serviceDataLayoutFormat, + spanIndexBaseName + spanDataLayoutFormat, + "cluster_one:" + spanIndexBaseName + spanDataLayoutFormat, + "cluster_two:" + spanIndexBaseName + spanDataLayoutFormat, + serviceIndexBaseName + serviceDataLayoutFormat, + "cluster_one:" + serviceIndexBaseName + serviceDataLayoutFormat, + "cluster_two:" + serviceIndexBaseName + serviceDataLayoutFormat, }, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + archiveIndexSuffix, - "cluster_one:" + spanIndex + archiveIndexSuffix, - "cluster_two:" + spanIndex + archiveIndexSuffix, - serviceIndex + archiveIndexSuffix, - "cluster_one:" + serviceIndex + archiveIndexSuffix, - "cluster_two:" + serviceIndex + archiveIndexSuffix, + spanIndexBaseName + archiveIndexSuffix, + "cluster_one:" + spanIndexBaseName + archiveIndexSuffix, + "cluster_two:" + spanIndexBaseName + archiveIndexSuffix, + serviceIndexBaseName + archiveIndexSuffix, + "cluster_one:" + serviceIndexBaseName + archiveIndexSuffix, + "cluster_two:" + serviceIndexBaseName + archiveIndexSuffix, }, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + "read", - "cluster_one:" + spanIndex + "read", - "cluster_two:" + spanIndex + "read", - serviceIndex + "read", - "cluster_one:" + serviceIndex + "read", - "cluster_two:" + serviceIndex + "read", + spanIndexBaseName + "read", + "cluster_one:" + spanIndexBaseName + "read", + "cluster_two:" + spanIndexBaseName + "read", + serviceIndexBaseName + "read", + "cluster_one:" + serviceIndexBaseName + "read", + "cluster_two:" + serviceIndexBaseName + "read", }, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + archiveReadIndexSuffix, - "cluster_one:" + spanIndex + archiveReadIndexSuffix, - "cluster_two:" + spanIndex + archiveReadIndexSuffix, - serviceIndex + archiveReadIndexSuffix, - "cluster_one:" + serviceIndex + archiveReadIndexSuffix, - "cluster_two:" + serviceIndex + archiveReadIndexSuffix, + spanIndexBaseName + archiveReadIndexSuffix, + "cluster_one:" + spanIndexBaseName + archiveReadIndexSuffix, + "cluster_two:" + spanIndexBaseName + archiveReadIndexSuffix, + serviceIndexBaseName + archiveReadIndexSuffix, + "cluster_one:" + serviceIndexBaseName + archiveReadIndexSuffix, + "cluster_two:" + serviceIndexBaseName + archiveReadIndexSuffix, }, }, } @@ -294,8 +306,8 @@ func TestSpanReaderIndices(t *testing.T) { testCase.params.Tracer = tracer.Tracer("test") r := NewSpanReader(testCase.params) - actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) - actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour) + actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndex.DateLayout, date, date, -1*time.Hour) + actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndex.DateLayout, date, date, -24*time.Hour) assert.Equal(t, testCase.indices, append(actualSpan, actualService...)) } } @@ -574,30 +586,30 @@ func TestSpanReaderFindIndices(t *testing.T) { startTime: today.Add(-time.Millisecond), endTime: today, expected: []string{ - indexWithDate(spanIndex, dateLayout, today), + indexWithDate(spanIndexBaseName, dateLayout, today), }, }, { startTime: today.Add(-13 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, dateLayout, today), - indexWithDate(spanIndex, dateLayout, yesterday), + indexWithDate(spanIndexBaseName, dateLayout, today), + indexWithDate(spanIndexBaseName, dateLayout, yesterday), }, }, { startTime: today.Add(-48 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, dateLayout, today), - indexWithDate(spanIndex, dateLayout, yesterday), - indexWithDate(spanIndex, dateLayout, twoDaysAgo), + indexWithDate(spanIndexBaseName, dateLayout, today), + indexWithDate(spanIndexBaseName, dateLayout, yesterday), + indexWithDate(spanIndexBaseName, dateLayout, twoDaysAgo), }, }, } withSpanReader(t, func(r *spanReaderTest) { for _, testCase := range testCases { - actual := r.reader.timeRangeIndices(spanIndex, dateLayout, testCase.startTime, testCase.endTime, -24*time.Hour) + actual := r.reader.timeRangeIndices(spanIndexBaseName, dateLayout, testCase.startTime, testCase.endTime, -24*time.Hour) assert.EqualValues(t, testCase.expected, actual) } }) @@ -605,7 +617,7 @@ func TestSpanReaderFindIndices(t *testing.T) { func TestSpanReader_indexWithDate(t *testing.T) { withSpanReader(t, func(_ *spanReaderTest) { - actual := indexWithDate(spanIndex, "2006-01-02", time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) + actual := indexWithDate(spanIndexBaseName, "2006-01-02", time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) assert.Equal(t, "jaeger-span-1995-04-21", actual) }) } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 9a18677a258..dd32a843ab3 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -7,7 +7,6 @@ package spanstore import ( "context" "fmt" - "strings" "time" "go.uber.org/zap" @@ -15,6 +14,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/es" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" @@ -46,18 +46,18 @@ type SpanWriter struct { // SpanWriterParams holds constructor parameters for NewSpanWriter type SpanWriterParams struct { - Client func() es.Client - Logger *zap.Logger - MetricsFactory metrics.Factory - IndexPrefix string - SpanIndexDateLayout string - ServiceIndexDateLayout string - AllTagsAsFields bool - TagKeysAsFields []string - TagDotReplacement string - Archive bool - UseReadWriteAliases bool - ServiceCacheTTL time.Duration + Client func() es.Client + Logger *zap.Logger + MetricsFactory metrics.Factory + SpanIndex cfg.IndexOptions + ServiceIndex cfg.IndexOptions + IndexPrefix cfg.IndexPrefix + AllTagsAsFields bool + TagKeysAsFields []string + TagDotReplacement string + Archive bool + UseReadWriteAliases bool + ServiceCacheTTL time.Duration } // NewSpanWriter creates a new SpanWriter for use @@ -76,22 +76,21 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { }, serviceWriter: serviceOperationStorage.Write, spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix, p.SpanIndexDateLayout, p.ServiceIndexDateLayout), + spanServiceIndex: getSpanAndServiceIndexFn(p), } } // CreateTemplates creates index templates. -func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix string) error { - if indexPrefix != "" && !strings.HasSuffix(indexPrefix, "-") { - indexPrefix += "-" - } - _, err := s.client().CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background()) +func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, indexPrefix cfg.IndexPrefix) error { + jaegerSpanIdx := indexPrefix.Apply("jaeger-span") + jaegerServiceIdx := indexPrefix.Apply("jaeger-service") + _, err := s.client().CreateTemplate(jaegerSpanIdx).Body(spanTemplate).Do(context.Background()) if err != nil { - return fmt.Errorf("failed to create template %q: %w", indexPrefix+"jaeger-span", err) + return fmt.Errorf("failed to create template %q: %w", jaegerSpanIdx, err) } - _, err = s.client().CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background()) + _, err = s.client().CreateTemplate(jaegerServiceIdx).Body(serviceTemplate).Do(context.Background()) if err != nil { - return fmt.Errorf("failed to create template %q: %w", indexPrefix+"jaeger-service", err) + return fmt.Errorf("failed to create template %q: %w", jaegerServiceIdx, err) } return nil } @@ -99,28 +98,25 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix // spanAndServiceIndexFn returns names of span and service indices type spanAndServiceIndexFn func(spanTime time.Time) (string, string) -func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, spanDateLayout string, serviceDateLayout string) spanAndServiceIndexFn { - if prefix != "" { - prefix += indexPrefixSeparator - } - spanIndexPrefix := prefix + spanIndex - serviceIndexPrefix := prefix + serviceIndex - if archive { +func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { + spanIndexPrefix := p.IndexPrefix.Apply(spanIndexBaseName) + serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName) + if p.Archive { return func(_ time.Time) (string, string) { - if useReadWriteAliases { + if p.UseReadWriteAliases { return archiveIndex(spanIndexPrefix, archiveWriteIndexSuffix), "" } return archiveIndex(spanIndexPrefix, archiveIndexSuffix), "" } } - if useReadWriteAliases { + if p.UseReadWriteAliases { return func(_ /* spanTime */ time.Time) (string, string) { return spanIndexPrefix + "write", serviceIndexPrefix + "write" } } return func(date time.Time) (string, string) { - return indexWithDate(spanIndexPrefix, spanDateLayout, date), indexWithDate(serviceIndexPrefix, serviceDateLayout, date) + return indexWithDate(spanIndexPrefix, p.SpanIndex.DateLayout, date), indexWithDate(serviceIndexPrefix, p.ServiceIndex.DateLayout, date) } } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 33314c726ce..381f08271db 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -19,6 +19,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -40,7 +41,12 @@ func withSpanWriter(fn func(w *spanWriterTest)) { client: client, logger: logger, logBuffer: logBuffer, - writer: NewSpanWriter(SpanWriterParams{Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, SpanIndexDateLayout: "2006-01-02", ServiceIndexDateLayout: "2006-01-02"}), + writer: NewSpanWriter(SpanWriterParams{ + Client: func() es.Client { return client }, + Logger: logger, MetricsFactory: metricsFactory, + SpanIndex: config.IndexOptions{DateLayout: "2006-01-02"}, + ServiceIndex: config.IndexOptions{DateLayout: "2006-01-02"}, + }), } fn(w) } @@ -57,6 +63,10 @@ func TestSpanWriterIndices(t *testing.T) { serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) + + spanIndexOpts := config.IndexOptions{DateLayout: spanDataLayout} + serviceIndexOpts := config.IndexOptions{DateLayout: serviceDataLayout} + testCases := []struct { indices []string params SpanWriterParams @@ -64,51 +74,51 @@ func TestSpanWriterIndices(t *testing.T) { { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, Archive: false, }, - indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, + indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, UseReadWriteAliases: true, }, - indices: []string{spanIndex + "write", serviceIndex + "write"}, + indices: []string{spanIndexBaseName + "write", serviceIndexBaseName + "write"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: false, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + spanDataLayoutFormat, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", UseReadWriteAliases: true, }, - indices: []string{"foo:-" + spanIndex + "write", "foo:-" + serviceIndex + "write"}, + indices: []string{"foo:-" + spanIndexBaseName + "write", "foo:-" + serviceIndexBaseName + "write"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, Archive: true, }, - indices: []string{spanIndex + archiveIndexSuffix, ""}, + indices: []string{spanIndexBaseName + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, ""}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveWriteIndexSuffix, ""}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveWriteIndexSuffix, ""}, }, } for _, testCase := range testCases { @@ -211,7 +221,7 @@ func TestCreateTemplates(t *testing.T) { err string spanTemplateService func() *mocks.TemplateCreateService serviceTemplateService func() *mocks.TemplateCreateService - indexPrefix string + indexPrefix config.IndexPrefix }{ { spanTemplateService: func() *mocks.TemplateCreateService { @@ -276,12 +286,10 @@ func TestCreateTemplates(t *testing.T) { for _, test := range tests { withSpanWriter(func(w *spanWriterTest) { - prefix := "" - if test.indexPrefix != "" && !strings.HasSuffix(test.indexPrefix, "-") { - prefix = test.indexPrefix + "-" - } - w.client.On("CreateTemplate", prefix+"jaeger-span").Return(test.spanTemplateService()) - w.client.On("CreateTemplate", prefix+"jaeger-service").Return(test.serviceTemplateService()) + jaegerSpanId := test.indexPrefix.Apply("jaeger-span") + jaegerServiceId := test.indexPrefix.Apply("jaeger-service") + w.client.On("CreateTemplate", jaegerSpanId).Return(test.spanTemplateService()) + w.client.On("CreateTemplate", jaegerServiceId).Return(test.serviceTemplateService()) err := w.writer.CreateTemplates(mock.Anything, mock.Anything, test.indexPrefix) if test.err != "" { require.Error(t, err, test.err) @@ -296,8 +304,8 @@ func TestSpanIndexName(t *testing.T) { span := &model.Span{ StartTime: date, } - spanIndexName := indexWithDate(spanIndex, "2006-01-02", span.StartTime) - serviceIndexName := indexWithDate(serviceIndex, "2006-01-02", span.StartTime) + spanIndexName := indexWithDate(spanIndexBaseName, "2006-01-02", span.StartTime) + serviceIndexName := indexWithDate(serviceIndexBaseName, "2006-01-02", span.StartTime) assert.Equal(t, "jaeger-span-1995-04-21", spanIndexName) assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName) }