Skip to content

Commit

Permalink
add prefix and postfix only during final processing
Browse files Browse the repository at this point in the history
instead of just after parsing bucket/key before counting
 - less work if multiple messages aggregate into single bucket

need special handling for receive-counter so it continues to
not have prefix/postfix apply to it

update tests for prefix/postfix applied during different stage

and minor cleanup to TestProcessCounters persist-count-keys bits
  • Loading branch information
ploxiln committed Jul 27, 2018
1 parent 0bff5eb commit 619304e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 46 deletions.
62 changes: 37 additions & 25 deletions statsdaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func init() {
}

var (
receiveCount uint64
In = make(chan *Packet, MAX_UNPROCESSED_PACKETS)
counters = make(map[string]float64)
gauges = make(map[string]float64)
Expand Down Expand Up @@ -139,10 +140,7 @@ func monitor() {
}

func packetHandler(s *Packet) {
if *receiveCounter != "" {
// if missing gets 0.0 (and adds 1 and stores)
counters[*receiveCounter] += 1
}
receiveCount++

switch s.Modifier {
case "ms":
Expand Down Expand Up @@ -224,21 +222,37 @@ func submit(deadline time.Time) error {

func processCounters(buffer *bytes.Buffer, now int64) int64 {
var num int64
// continue sending zeros for counters for a short period of time even if we have no new data

// avoid adding prefix/postfix to receiveCounter
if *receiveCounter != "" && receiveCount > 0 {
fmt.Fprintf(buffer, "%s %d %d\n", *receiveCounter, receiveCount, now)
if *persistCountKeys > 0 {
countInactivity[*receiveCounter] = 0
}
num++
}
receiveCount = 0

for bucket, value := range counters {
fmt.Fprintf(buffer, "%s %s %d\n", bucket, strconv.FormatFloat(value, 'f', -1, 64), now)
fullbucket := *prefix + bucket + *postfix
fmt.Fprintf(buffer, "%s %s %d\n", fullbucket, strconv.FormatFloat(value, 'f', -1, 64), now)
delete(counters, bucket)
countInactivity[bucket] = 0
if *persistCountKeys > 0 {
countInactivity[fullbucket] = 0
}
num++
}

// continue sending zeros for counters for a short period of time even if we have no new data
for bucket, purgeCount := range countInactivity {
if purgeCount > 0 {
fmt.Fprintf(buffer, "%s 0 %d\n", bucket, now)
num++
}
countInactivity[bucket] += 1
if countInactivity[bucket] > *persistCountKeys {
if purgeCount >= *persistCountKeys {
delete(countInactivity, bucket)
} else {
countInactivity[bucket] = purgeCount + 1
}
}
return num
Expand All @@ -248,7 +262,8 @@ func processGauges(buffer *bytes.Buffer, now int64) int64 {
var num int64

for bucket, currentValue := range gauges {
fmt.Fprintf(buffer, "%s %s %d\n", bucket, strconv.FormatFloat(currentValue, 'f', -1, 64), now)
valstr := strconv.FormatFloat(currentValue, 'f', -1, 64)
fmt.Fprintf(buffer, "%s%s%s %s %d\n", *prefix, bucket, *postfix, valstr, now)
num++
if *deleteGauges {
delete(gauges, bucket)
Expand All @@ -266,7 +281,7 @@ func processSets(buffer *bytes.Buffer, now int64) int64 {
uniqueSet[str] = true
}

fmt.Fprintf(buffer, "%s %d %d\n", bucket, len(uniqueSet), now)
fmt.Fprintf(buffer, "%s%s%s %d %d\n", *prefix, bucket, *postfix, len(uniqueSet), now)
delete(sets, bucket)
}
return num
Expand All @@ -275,7 +290,6 @@ func processSets(buffer *bytes.Buffer, now int64) int64 {
func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 {
var num int64
for bucket, timer := range timers {
bucketWithoutPostfix := bucket[:len(bucket)-len(*postfix)]
num++

sort.Sort(timer)
Expand Down Expand Up @@ -307,27 +321,25 @@ func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 {
maxAtThreshold = timer[indexOfPerc]
}

var tmpl string
var pctstr string
if pct.float >= 0 {
tmpl = "%s.upper_%s%s %s %d\n"
pctstr = pct.str
} else {
tmpl = "%s.lower_%s%s %s %d\n"
ptype := "upper"
pctstr := pct.str
if pct.float < 0 {
ptype = "lower"
pctstr = pct.str[1:]
}
threshold_s := strconv.FormatFloat(maxAtThreshold, 'f', -1, 64)
fmt.Fprintf(buffer, tmpl, bucketWithoutPostfix, pctstr, *postfix, threshold_s, now)
fmt.Fprintf(buffer, "%s%s.%s_%s%s %s %d\n",
*prefix, bucket, ptype, pctstr, *postfix, threshold_s, now)
}

mean_s := strconv.FormatFloat(mean, 'f', -1, 64)
max_s := strconv.FormatFloat(max, 'f', -1, 64)
min_s := strconv.FormatFloat(min, 'f', -1, 64)

fmt.Fprintf(buffer, "%s.mean%s %s %d\n", bucketWithoutPostfix, *postfix, mean_s, now)
fmt.Fprintf(buffer, "%s.upper%s %s %d\n", bucketWithoutPostfix, *postfix, max_s, now)
fmt.Fprintf(buffer, "%s.lower%s %s %d\n", bucketWithoutPostfix, *postfix, min_s, now)
fmt.Fprintf(buffer, "%s.count%s %d %d\n", bucketWithoutPostfix, *postfix, count, now)
fmt.Fprintf(buffer, "%s%s.mean%s %s %d\n", *prefix, bucket, *postfix, mean_s, now)
fmt.Fprintf(buffer, "%s%s.upper%s %s %d\n", *prefix, bucket, *postfix, max_s, now)
fmt.Fprintf(buffer, "%s%s.lower%s %s %d\n", *prefix, bucket, *postfix, min_s, now)
fmt.Fprintf(buffer, "%s%s.count%s %d %d\n", *prefix, bucket, *postfix, count, now)

delete(timers, bucket)
}
Expand Down Expand Up @@ -486,7 +498,7 @@ func parseLine(line []byte) *Packet {
}

return &Packet{
Bucket: *prefix + sanitizeBucket(name) + *postfix,
Bucket: sanitizeBucket(name),
ValFlt: floatval,
ValStr: strval,
Modifier: typeCode,
Expand Down
71 changes: 50 additions & 21 deletions statsdaemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,15 @@ func TestParseLineMisc(t *testing.T) {
assert.Equal(t, float32(1), packet.Sampling)

flag.Set("prefix", "test.")
flag.Set("postfix", ".test")
d = []byte("prefix:4|c")
packet = parseLine(d)
assert.Equal(t, "test.prefix", packet.Bucket)
// prefix/postfix not in Bucket/key, added in processCounters() etc
assert.Equal(t, "prefix", packet.Bucket)
assert.Equal(t, float64(4), packet.ValFlt)
assert.Equal(t, "c", packet.Modifier)
assert.Equal(t, float32(1), packet.Sampling)
flag.Set("prefix", "")

flag.Set("postfix", ".test")
d = []byte("postfix:4|c")
packet = parseLine(d)
assert.Equal(t, "postfix.test", packet.Bucket)
assert.Equal(t, float64(4), packet.ValFlt)
assert.Equal(t, "c", packet.Modifier)
assert.Equal(t, float32(1), packet.Sampling)
flag.Set("postfix", "")

d = []byte("a.key.with-0.dash:4|c\ngauge:3|g")
Expand Down Expand Up @@ -367,8 +361,9 @@ func TestMultiTcp(t *testing.T) {
}

func TestPacketHandlerReceiveCounter(t *testing.T) {
flag.Set("receive-counter", "countme")
receiveCount = 0
counters = make(map[string]float64)
*receiveCounter = "countme"

p := &Packet{
Bucket: "gorets",
Expand All @@ -377,10 +372,11 @@ func TestPacketHandlerReceiveCounter(t *testing.T) {
Sampling: float32(1),
}
packetHandler(p)
assert.Equal(t, counters["countme"], float64(1))

assert.Equal(t, receiveCount, uint64(1))
packetHandler(p)
assert.Equal(t, counters["countme"], float64(2))
assert.Equal(t, receiveCount, uint64(2))

flag.Set("receive-counter", "")
}

func TestPacketHandlerCount(t *testing.T) {
Expand Down Expand Up @@ -491,8 +487,8 @@ func TestPacketHandlerSet(t *testing.T) {
}

func TestProcessCounters(t *testing.T) {

*persistCountKeys = int64(10)
flag.Set("persist-count-keys", "10")
receiveCount = 0
counters = make(map[string]float64)
var buffer bytes.Buffer
now := int64(1418052649)
Expand All @@ -504,7 +500,7 @@ func TestProcessCounters(t *testing.T) {
assert.Equal(t, buffer.String(), "gorets 123 1418052649\n")

// run processCounters() enough times to make sure it purges items
for i := 0; i < int(*persistCountKeys)+10; i++ {
for i := 0; i < int(*persistCountKeys); i++ {
num = processCounters(&buffer, now)
}
lines := bytes.Split(buffer.Bytes(), []byte("\n"))
Expand All @@ -515,6 +511,37 @@ func TestProcessCounters(t *testing.T) {
assert.Equal(t, string(lines[*persistCountKeys]), "gorets 0 1418052649")
}

func TestProcessCountersPrefix(t *testing.T) {
counters = make(map[string]float64)
var buffer bytes.Buffer
now := int64(1418052649)

flag.Set("persist-count-keys", "2")
flag.Set("prefix", "pre.")
flag.Set("postfix", ".post")

counters["gorets"] = float64(123)
num := processCounters(&buffer, now)
firstOutput := buffer.String()
// run processCounters() enough times to make sure it purges items
for i := 0; i < int(*persistCountKeys); i++ {
processCounters(&buffer, now)
}

// set back flags before asserting
flag.Set("persist-count-keys", "60")
flag.Set("prefix", "")
flag.Set("postfix", "")

assert.Equal(t, num, int64(1))
assert.Equal(t, firstOutput, "pre.gorets.post 123 1418052649\n")

lines := bytes.Split(buffer.Bytes(), []byte("\n"))
assert.Equal(t, string(lines[0]), "pre.gorets.post 123 1418052649")
assert.Equal(t, string(lines[1]), "pre.gorets.post 0 1418052649")
assert.Equal(t, string(lines[2]), "pre.gorets.post 0 1418052649")
}

func TestProcessTimers(t *testing.T) {
// Some data with expected mean of 20
timers = make(map[string]Float64Slice)
Expand Down Expand Up @@ -644,11 +671,11 @@ func TestProcessTimersUpperPercentile(t *testing.T) {
}

func TestProcessTimersUpperPercentilePostfix(t *testing.T) {
flag.Set("prefix", "pfx.")
flag.Set("postfix", ".test")
// Some data with expected 75% of 2
timers = make(map[string]Float64Slice)
timers["postfix_response_time.test"] = []float64{0, 1, 2, 3}

timers["postfix_response_time"] = []float64{0, 1, 2, 3}
now := int64(1418052649)

var buffer bytes.Buffer
Expand All @@ -658,12 +685,14 @@ func TestProcessTimersUpperPercentilePostfix(t *testing.T) {
"75",
},
})

lines := bytes.Split(buffer.Bytes(), []byte("\n"))

assert.Equal(t, num, int64(1))
assert.Equal(t, string(lines[0]), "postfix_response_time.upper_75.test 2 1418052649")
// set flags back before asserting
flag.Set("prefix", "")
flag.Set("postfix", "")

assert.Equal(t, num, int64(1))
assert.Equal(t, string(lines[0]), "pfx.postfix_response_time.upper_75.test 2 1418052649")
}

func TestProcessTimesLowerPercentile(t *testing.T) {
Expand Down

0 comments on commit 619304e

Please sign in to comment.