Skip to content

Commit

Permalink
Add max span count parameter (#96)
Browse files Browse the repository at this point in the history
* Added MaxSpanCount param to config

Signed-off-by: Yury Frolov <[email protected]>

* Added MaxSpanCount param to pool

Signed-off-by: Yury Frolov <[email protected]>

* Added test for default value of maxSpanCount

Signed-off-by: Yury Frolov <[email protected]>
  • Loading branch information
EinKrebs authored Oct 1, 2021
1 parent f47c43c commit 63dd379
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 16 deletions.
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
address: tcp://some-clickhouse-server:9000
# When empty the embedded scripts from sqlscripts directory are used
init_sql_scripts_dir:
# Maximal amount of spans that can be written at the same time. Default 10_000_000
max_span_count:
# Batch write size. Default 10_000.
batch_write_size:
# Batch flush interval. Default 5s.
Expand Down
9 changes: 5 additions & 4 deletions storage/clickhousespanstore/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/jaegertracing/jaeger/model"
)

const maxSpanCount int = 10000000

// WriteWorkerPool is a worker pool for writing batches of spans.
// Given a new batch, WriteWorkerPool creates a new WriteWorker.
// If the number of currently processed spans if more than maxSpanCount, then the oldest worker is removed.
Expand All @@ -21,12 +19,13 @@ type WriteWorkerPool struct {
batches chan []*model.Span

totalSpanCount int
maxSpanCount int
mutex sync.Mutex
workers workerHeap
workerDone chan *WriteWorker
}

func NewWorkerPool(params *WriteParams) WriteWorkerPool {
func NewWorkerPool(params *WriteParams, maxSpanCount int) WriteWorkerPool {
return WriteWorkerPool{
params: params,
finish: make(chan bool),
Expand All @@ -36,6 +35,8 @@ func NewWorkerPool(params *WriteParams) WriteWorkerPool {
mutex: sync.Mutex{},
workers: newWorkerHeap(100),
workerDone: make(chan *WriteWorker),

maxSpanCount: maxSpanCount,
}
}

Expand Down Expand Up @@ -85,7 +86,7 @@ func (pool *WriteWorkerPool) CLose() {

func (pool *WriteWorkerPool) CleanWorkers(batchSize int) {
pool.mutex.Lock()
if pool.totalSpanCount+batchSize > maxSpanCount {
if pool.totalSpanCount+batchSize > pool.maxSpanCount {
earliest := heap.Pop(pool.workers)
switch worker := earliest.(type) {
case WriteWorker:
Expand Down
17 changes: 13 additions & 4 deletions storage/clickhousespanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,16 @@ var registerMetrics sync.Once
var _ spanstore.Writer = (*SpanWriter)(nil)

// NewSpanWriter returns a SpanWriter for the database
func NewSpanWriter(logger hclog.Logger, db *sql.DB, indexTable, spansTable TableName, encoding Encoding, delay time.Duration, size int64) *SpanWriter {
func NewSpanWriter(
logger hclog.Logger,
db *sql.DB,
indexTable,
spansTable TableName,
encoding Encoding,
delay time.Duration,
size int64,
maxSpanCount int,
) *SpanWriter {
writer := &SpanWriter{
writeParams: WriteParams{
logger: logger,
Expand All @@ -63,7 +72,7 @@ func NewSpanWriter(logger hclog.Logger, db *sql.DB, indexTable, spansTable Table
}

writer.registerMetrics()
go writer.backgroundWriter()
go writer.backgroundWriter(maxSpanCount)

return writer
}
Expand All @@ -75,8 +84,8 @@ func (w *SpanWriter) registerMetrics() {
})
}

func (w *SpanWriter) backgroundWriter() {
pool := NewWorkerPool(&w.writeParams)
func (w *SpanWriter) backgroundWriter(maxSpanCount int) {
pool := NewWorkerPool(&w.writeParams, maxSpanCount)
go pool.Work()
batch := make([]*model.Span, 0, w.size)

Expand Down
6 changes: 6 additions & 0 deletions storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
defaultEncoding = JSONEncoding
JSONEncoding EncodingType = "json"
ProtobufEncoding EncodingType = "protobuf"
defaultMaxSpanCount = int(1e7)
defaultBatchSize = 10_000
defaultBatchDelay = time.Second * 5
defaultUsername = "default"
Expand All @@ -28,6 +29,8 @@ type Configuration struct {
BatchWriteSize int64 `yaml:"batch_write_size"`
// Batch flush interval. Default is 5s.
BatchFlushInterval time.Duration `yaml:"batch_flush_interval"`
// Maximal amount of spans that can be written at the same time. Default is 10_000_000.
MaxSpanCount int `yaml:"max_span_count"`
// Encoding either json or protobuf. Default is json.
Encoding EncodingType `yaml:"encoding"`
// ClickHouse address e.g. tcp://localhost:9000.
Expand Down Expand Up @@ -64,6 +67,9 @@ func (cfg *Configuration) setDefaults() {
if cfg.BatchFlushInterval == 0 {
cfg.BatchFlushInterval = defaultBatchDelay
}
if cfg.MaxSpanCount == 0 {
cfg.MaxSpanCount = defaultMaxSpanCount
}
if cfg.Encoding == "" {
cfg.Encoding = defaultEncoding
}
Expand Down
4 changes: 4 additions & 0 deletions storage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func TestSetDefaults(t *testing.T) {
getField: func(config Configuration) interface{} { return config.BatchFlushInterval },
expected: defaultBatchDelay,
},
"max span count": {
getField: func(config Configuration) interface{} { return config.MaxSpanCount },
expected: defaultMaxSpanCount,
},
"metrics endpoint": {
getField: func(config Configuration) interface{} { return config.MetricsEndpoint },
expected: defaultMetricsEndpoint,
Expand Down
20 changes: 12 additions & 8 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,22 @@ func NewStore(logger hclog.Logger, cfg Configuration) (*Store, error) {
}
if cfg.Replication {
return &Store{
db: db,
writer: clickhousespanstore.NewSpanWriter(logger, db, cfg.SpansIndexTable, cfg.SpansTable, clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize),
reader: clickhousespanstore.NewTraceReader(db, cfg.OperationsTable, cfg.SpansIndexTable, cfg.SpansTable),
archiveWriter: clickhousespanstore.NewSpanWriter(logger, db, "", cfg.GetSpansArchiveTable(), clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize),
db: db,
writer: clickhousespanstore.NewSpanWriter(logger, db, cfg.SpansIndexTable, cfg.SpansTable,
clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize, cfg.MaxSpanCount),
reader: clickhousespanstore.NewTraceReader(db, cfg.OperationsTable, cfg.SpansIndexTable, cfg.SpansTable),
archiveWriter: clickhousespanstore.NewSpanWriter(logger, db, "", cfg.GetSpansArchiveTable(),
clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize, cfg.MaxSpanCount),
archiveReader: clickhousespanstore.NewTraceReader(db, "", "", cfg.GetSpansArchiveTable()),
}, nil
}
return &Store{
db: db,
writer: clickhousespanstore.NewSpanWriter(logger, db, cfg.SpansIndexTable, cfg.SpansTable, clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize),
reader: clickhousespanstore.NewTraceReader(db, cfg.OperationsTable, cfg.SpansIndexTable, cfg.SpansTable),
archiveWriter: clickhousespanstore.NewSpanWriter(logger, db, "", cfg.GetSpansArchiveTable(), clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize),
db: db,
writer: clickhousespanstore.NewSpanWriter(logger, db, cfg.SpansIndexTable, cfg.SpansTable,
clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize, cfg.MaxSpanCount),
reader: clickhousespanstore.NewTraceReader(db, cfg.OperationsTable, cfg.SpansIndexTable, cfg.SpansTable),
archiveWriter: clickhousespanstore.NewSpanWriter(logger, db, "", cfg.GetSpansArchiveTable(),
clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize, cfg.MaxSpanCount),
archiveReader: clickhousespanstore.NewTraceReader(db, "", "", cfg.GetSpansArchiveTable()),
}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func newStore(db *sql.DB, logger mocks.SpyLogger) Store {
clickhousespanstore.EncodingJSON,
0,
0,
0,
),
reader: clickhousespanstore.NewTraceReader(
db,
Expand All @@ -101,6 +102,7 @@ func newStore(db *sql.DB, logger mocks.SpyLogger) Store {
clickhousespanstore.EncodingJSON,
0,
0,
0,
),
archiveReader: clickhousespanstore.NewTraceReader(
db,
Expand Down

0 comments on commit 63dd379

Please sign in to comment.