Skip to content

Commit

Permalink
Added comments for new types (#94)
Browse files Browse the repository at this point in the history
* Added comments for new types

Signed-off-by: Yury Frolov <[email protected]>

* Added comment for WriteParams

Signed-off-by: Yury Frolov <[email protected]>

* Formatted & fixed lint issues

Signed-off-by: Yury Frolov <[email protected]>
  • Loading branch information
EinKrebs authored Oct 1, 2021
1 parent 9e90179 commit f47c43c
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
11 changes: 6 additions & 5 deletions storage/clickhousespanstore/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ var (
)

type heapItem struct {
startTime time.Time
worker *WriteWorker
pushTime time.Time
worker *WriteWorker
}

// workerHeap is a heap for WriteWorkers where worker's push time is the key.
type workerHeap struct {
elems *[]*heapItem
indexes map[*WriteWorker]int
Expand All @@ -31,8 +32,8 @@ func newWorkerHeap(cap int) workerHeap {

func (workerHeap workerHeap) AddWorker(worker *WriteWorker) {
heap.Push(workerHeap, heapItem{
worker: worker,
startTime: time.Now(),
worker: worker,
pushTime: time.Now(),
})
}

Expand All @@ -56,7 +57,7 @@ func (workerHeap workerHeap) Len() int {
}

func (workerHeap workerHeap) Less(i, j int) bool {
return (*workerHeap.elems)[i].startTime.Before((*workerHeap.elems)[j].startTime)
return (*workerHeap.elems)[i].pushTime.Before((*workerHeap.elems)[j].pushTime)
}

func (workerHeap workerHeap) Swap(i, j int) {
Expand Down
1 change: 1 addition & 0 deletions storage/clickhousespanstore/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/hashicorp/go-hclog"
)

// WriteParams contains parameters that are shared between WriteWorker`s
type WriteParams struct {
logger hclog.Logger
db *sql.DB
Expand Down
11 changes: 6 additions & 5 deletions storage/clickhousespanstore/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (

const maxSpanCount int = 10000000

// WriteWorkerPool is a worker pool for writing batches of spans.
// Given a new batch, WriteWorkerPool creates a new WriteWorker.
// If the number of currently processed spans if more than maxSpanCount, then the oldest worker is removed.
type WriteWorkerPool struct {
params *WriteParams

Expand All @@ -19,9 +22,8 @@ type WriteWorkerPool struct {

totalSpanCount int
mutex sync.Mutex
// TODO: rewrite on using heap
workers workerHeap
workerDone chan *WriteWorker
workers workerHeap
workerDone chan *WriteWorker
}

func NewWorkerPool(params *WriteParams) WriteWorkerPool {
Expand All @@ -31,8 +33,7 @@ func NewWorkerPool(params *WriteParams) WriteWorkerPool {
done: sync.WaitGroup{},
batches: make(chan []*model.Span),

mutex: sync.Mutex{},
// TODO: decide on size
mutex: sync.Mutex{},
workers: newWorkerHeap(100),
workerDone: make(chan *WriteWorker),
}
Expand Down
3 changes: 3 additions & 0 deletions storage/clickhousespanstore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

var delays = []int{2, 3, 5, 8}

// WriteWorker writes spans to CLickHouse.
// Given a batch of spans, WriteWorker attempts to write them to database.
// Interval in seconds between attempts changes due to delays slice, then it remains the same as the last value in delays.
type WriteWorker struct {
params *WriteParams

Expand Down

0 comments on commit f47c43c

Please sign in to comment.