From 907dc5e7b3b46884056855c0c2f137437c6c6e7f Mon Sep 17 00:00:00 2001 From: Jens Rantil Date: Tue, 5 Nov 2013 09:10:30 +0100 Subject: [PATCH 1/3] Parallelize `processTimers(...)` If you have multiple different timers then this will speed up the processing time. --- statsdaemon.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/statsdaemon.go b/statsdaemon.go index 4248760..032fd38 100644 --- a/statsdaemon.go +++ b/statsdaemon.go @@ -17,6 +17,7 @@ import ( "strings" "syscall" "time" + "sync" ) const ( @@ -129,7 +130,6 @@ func monitor() { func submit(deadline time.Time) error { var buffer bytes.Buffer - var num int64 now := time.Now().Unix() @@ -152,9 +152,28 @@ func submit(deadline time.Time) error { return errors.New(errmsg) } - num += processCounters(&buffer, now) - num += processGauges(&buffer, now) - num += processTimers(&buffer, now, percentThreshold) + var numchan chan int64 + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + numchan <- processCounters(&buffer, now) + }() + go func() { + defer wg.Done() + numchan <- processGauges(&buffer, now) + }() + go func() { + defer wg.Done() + numchan <- processTimers(&buffer, now, percentThreshold) + }() + wg.Done() + close(numchan) + + var num int64 + for n := range(numchan) { + num += n + } if num == 0 { return nil } From 48f899c4f4633c52c96509858507d0965e7daec8 Mon Sep 17 00:00:00 2001 From: Jens Rantil Date: Thu, 7 Nov 2013 00:43:30 +0100 Subject: [PATCH 2/3] Parallelize `processTimer(...)` --- statsdaemon.go | 107 +++++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 49 deletions(-) diff --git a/statsdaemon.go b/statsdaemon.go index 032fd38..9e53ed9 100644 --- a/statsdaemon.go +++ b/statsdaemon.go @@ -235,63 +235,72 @@ func processGauges(buffer *bytes.Buffer, now int64) int64 { return num } -func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 { - var num int64 - for u, t := range timers { - if len(t) > 0 { - num++ +func processTimer(buffer *bytes.Buffer, now int64, pctls Percentiles, u string, t Uint64Slice) { + sort.Sort(t) + min := t[0] + max := t[len(t)-1] + maxAtThreshold := max + count := len(t) + + sum := uint64(0) + for _, value := range t { + sum += value + } + mean := float64(sum) / float64(len(t)) - sort.Sort(t) - min := t[0] - max := t[len(t)-1] - maxAtThreshold := max - count := len(t) + for _, pct := range pctls { - sum := uint64(0) - for _, value := range t { - sum += value + if len(t) > 1 { + var abs float64 + if pct.float >= 0 { + abs = pct.float + } else { + abs = 100 + pct.float } - mean := float64(sum) / float64(len(t)) - - for _, pct := range pctls { - - if len(t) > 1 { - var abs float64 - if pct.float >= 0 { - abs = pct.float - } else { - abs = 100 + pct.float - } - // poor man's math.Round(x): - // math.Floor(x + 0.5) - indexOfPerc := int(math.Floor(((abs / 100.0) * float64(count)) + 0.5)) - if pct.float >= 0 { - indexOfPerc -= 1 // index offset=0 - } - maxAtThreshold = t[indexOfPerc] - } - - var tmpl string - var pctstr string - if pct.float >= 0 { - tmpl = "%s.upper_%s %d %d\n" - pctstr = pct.str - } else { - tmpl = "%s.lower_%s %d %d\n" - pctstr = pct.str[1:] - } - fmt.Fprintf(buffer, tmpl, u, pctstr, maxAtThreshold, now) + // poor man's math.Round(x): + // math.Floor(x + 0.5) + indexOfPerc := int(math.Floor(((abs / 100.0) * float64(count)) + 0.5)) + if pct.float >= 0 { + indexOfPerc -= 1 // index offset=0 } + maxAtThreshold = t[indexOfPerc] + } - var z Uint64Slice - timers[u] = z + var tmpl string + var pctstr string + if pct.float >= 0 { + tmpl = "%s.upper_%s %d %d\n" + pctstr = pct.str + } else { + tmpl = "%s.lower_%s %d %d\n" + pctstr = pct.str[1:] + } + fmt.Fprintf(buffer, tmpl, u, pctstr, maxAtThreshold, now) + } + + var z Uint64Slice + timers[u] = z - fmt.Fprintf(buffer, "%s.mean %f %d\n", u, mean, now) - fmt.Fprintf(buffer, "%s.upper %d %d\n", u, max, now) - fmt.Fprintf(buffer, "%s.lower %d %d\n", u, min, now) - fmt.Fprintf(buffer, "%s.count %d %d\n", u, count, now) + fmt.Fprintf(buffer, "%s.mean %f %d\n", u, mean, now) + fmt.Fprintf(buffer, "%s.upper %d %d\n", u, max, now) + fmt.Fprintf(buffer, "%s.lower %d %d\n", u, min, now) + fmt.Fprintf(buffer, "%s.count %d %d\n", u, count, now) +} + +func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 { + var num int64 + var wg sync.WaitGroup + for u, t := range timers { + if len(t) > 0 { + num++ + wg.Add(1) + go func() { + defer wg.Done() + processTimer(buffer, now, pctls, u, t) + }() } } + wg.Wait() return num } From f827106a8ad8298013ba32e4dcba3e0da3e8d5a4 Mon Sep 17 00:00:00 2001 From: Jens Rantil Date: Tue, 12 Nov 2013 15:00:57 +0100 Subject: [PATCH 3/3] Go formatting --- statsdaemon.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/statsdaemon.go b/statsdaemon.go index 9e53ed9..dcb940b 100644 --- a/statsdaemon.go +++ b/statsdaemon.go @@ -15,9 +15,9 @@ import ( "sort" "strconv" "strings" + "sync" "syscall" "time" - "sync" ) const ( @@ -171,7 +171,7 @@ func submit(deadline time.Time) error { close(numchan) var num int64 - for n := range(numchan) { + for n := range numchan { num += n } if num == 0 { @@ -261,7 +261,7 @@ func processTimer(buffer *bytes.Buffer, now int64, pctls Percentiles, u string, // math.Floor(x + 0.5) indexOfPerc := int(math.Floor(((abs / 100.0) * float64(count)) + 0.5)) if pct.float >= 0 { - indexOfPerc -= 1 // index offset=0 + indexOfPerc -= 1 // index offset=0 } maxAtThreshold = t[indexOfPerc] }