diff --git a/storage/clickhousespanstore/heap.go b/storage/clickhousespanstore/heap.go index b7005e83..95956888 100644 --- a/storage/clickhousespanstore/heap.go +++ b/storage/clickhousespanstore/heap.go @@ -12,10 +12,11 @@ var ( ) type heapItem struct { - startTime time.Time - worker *WriteWorker + pushTime time.Time + worker *WriteWorker } +// workerHeap is a heap for WriteWorkers where worker's push time is the key. type workerHeap struct { elems *[]*heapItem indexes map[*WriteWorker]int @@ -31,8 +32,8 @@ func newWorkerHeap(cap int) workerHeap { func (workerHeap workerHeap) AddWorker(worker *WriteWorker) { heap.Push(workerHeap, heapItem{ - worker: worker, - startTime: time.Now(), + worker: worker, + pushTime: time.Now(), }) } @@ -56,7 +57,7 @@ func (workerHeap workerHeap) Len() int { } func (workerHeap workerHeap) Less(i, j int) bool { - return (*workerHeap.elems)[i].startTime.Before((*workerHeap.elems)[j].startTime) + return (*workerHeap.elems)[i].pushTime.Before((*workerHeap.elems)[j].pushTime) } func (workerHeap workerHeap) Swap(i, j int) { diff --git a/storage/clickhousespanstore/params.go b/storage/clickhousespanstore/params.go index cea92c82..08f7943c 100644 --- a/storage/clickhousespanstore/params.go +++ b/storage/clickhousespanstore/params.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-hclog" ) +// WriteParams contains parameters that are shared between WriteWorker`s type WriteParams struct { logger hclog.Logger db *sql.DB diff --git a/storage/clickhousespanstore/pool.go b/storage/clickhousespanstore/pool.go index 36200b5d..9ee55325 100644 --- a/storage/clickhousespanstore/pool.go +++ b/storage/clickhousespanstore/pool.go @@ -10,6 +10,9 @@ import ( const maxSpanCount int = 10000000 +// WriteWorkerPool is a worker pool for writing batches of spans. +// Given a new batch, WriteWorkerPool creates a new WriteWorker. +// If the number of currently processed spans if more than maxSpanCount, then the oldest worker is removed. type WriteWorkerPool struct { params *WriteParams @@ -19,9 +22,8 @@ type WriteWorkerPool struct { totalSpanCount int mutex sync.Mutex - // TODO: rewrite on using heap - workers workerHeap - workerDone chan *WriteWorker + workers workerHeap + workerDone chan *WriteWorker } func NewWorkerPool(params *WriteParams) WriteWorkerPool { @@ -31,8 +33,7 @@ func NewWorkerPool(params *WriteParams) WriteWorkerPool { done: sync.WaitGroup{}, batches: make(chan []*model.Span), - mutex: sync.Mutex{}, - // TODO: decide on size + mutex: sync.Mutex{}, workers: newWorkerHeap(100), workerDone: make(chan *WriteWorker), } diff --git a/storage/clickhousespanstore/worker.go b/storage/clickhousespanstore/worker.go index 782c53a1..a18efa26 100644 --- a/storage/clickhousespanstore/worker.go +++ b/storage/clickhousespanstore/worker.go @@ -14,6 +14,9 @@ import ( var delays = []int{2, 3, 5, 8} +// WriteWorker writes spans to CLickHouse. +// Given a batch of spans, WriteWorker attempts to write them to database. +// Interval in seconds between attempts changes due to delays slice, then it remains the same as the last value in delays. type WriteWorker struct { params *WriteParams