Skip to content

Commit

Permalink
Fix scrape metric kind script
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed May 7, 2024
1 parent 57f8777 commit ecf12ab
Showing 1 changed file with 44 additions and 36 deletions.
80 changes: 44 additions & 36 deletions cmd/statshouse/statshouse-tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package main
import (
"bufio"
"context"
"encoding/json"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -616,28 +617,47 @@ func mainPublishTagDrafts() {
}
loader := metajournal.NewMetricMetaLoader(&client, metajournal.DefaultMetaTimeout)
var (
config aggregator.KnownTags
storage *metajournal.MetricsStorage
workMu sync.Mutex
work = make(map[int32]map[int32]format.MetricMetaValue)
workCond = sync.NewCond(&workMu)
namespaceName = "scrape"
namespaceID int32
storage *metajournal.MetricsStorage
workMu sync.Mutex
work = make(map[int32]map[int32]format.MetricMetaValue)
workCond = sync.NewCond(&workMu)
)
storage = metajournal.MakeMetricsStorage("", nil, nil, func(newEntries []tlmetadata.Event) {
var n int
for _, e := range newEntries {
switch e.EventType {
case format.NamespaceEvent:
meta := format.NamespaceMeta{}
err := json.Unmarshal([]byte(e.Data), &meta)
if err != nil {
log.Printf("Cannot marshal metric group %s: %v", meta.Name, err)
continue
}
if meta.Name == namespaceName {
fmt.Printf("%q namespace ID: %d\n", meta.Name, meta.ID)
workCond.L.Lock()
namespaceID = meta.ID
workCond.L.Unlock()
n++
}
case format.MetricEvent:
meta := format.MetricMetaValue{}
err := meta.UnmarshalBinary([]byte(e.Data))
if err != nil {
fmt.Fprintln(os.Stderr, e.Data)
fmt.Fprintln(os.Stderr, err)
fmt.Println(e.Data)
fmt.Println(err)
continue
}
if meta.NamespaceID == 0 || meta.NamespaceID == format.BuiltinNamespaceIDDefault {
continue
}
if len(meta.TagsDraft) == 0 {
if meta.Kind != format.MetricKindCounter {
continue
}
if strings.HasSuffix(meta.Name, "_bucket") {
fmt.Println("skip ", meta.Name)
continue
}
workCond.L.Lock()
Expand All @@ -648,44 +668,36 @@ func mainPublishTagDrafts() {
}
workCond.L.Unlock()
n++
case format.PromConfigEvent:
v, err := aggregator.ParseKnownTags([]byte(e.Data), storage)
fmt.Fprintln(os.Stderr, e.Data)
if err != nil {
fmt.Fprintln(os.Stderr, err)
continue
}
workCond.L.Lock()
config = v
workCond.L.Unlock()
n++
}
}
if n != 0 {
workCond.Signal()
}
})
storage.Journal().Start(nil, nil, loader.LoadJournal)
fmt.Println("Press <Enter> to start publishing tag drafts")
fmt.Println("Press <Enter> to start")
bufio.NewReader(os.Stdin).ReadString('\n')
fmt.Println("Publishing tag drafts")
fmt.Println("Running")
for {
var meta format.MetricMetaValue
workCond.L.Lock()
outer:
for {
var ns map[int32]format.MetricMetaValue
for _, v := range work {
if len(v) != 0 {
ns = v
break
}
if namespaceID == 0 {
workCond.Wait()
continue
}
ns := work[namespaceID]
if len(ns) == 0 {
workCond.Wait()
continue
}
meta = format.MetricMetaValue{}
for k, v := range ns {
if v.Kind != format.MetricKindCounter || strings.HasSuffix(meta.Name, "_bucket") {
delete(ns, k)
continue
}
meta = v
delete(ns, k)
break outer
Expand All @@ -694,26 +706,22 @@ func mainPublishTagDrafts() {
workCond.L.Unlock()
v := storage.GetMetaMetric(meta.MetricID)
if v == nil {
fmt.Fprintf(os.Stderr, "Failed to get metric %q\n", meta.Name)
fmt.Printf("Failed to get metric %q\n", meta.Name)
continue
}
meta = *v
workCond.L.Lock()
n := config.PublishDraftTags(&meta)
workCond.L.Unlock()
if n == 0 {
continue
}
fmt.Println(meta.NamespaceID, meta.Name, meta.Version)
meta.Kind = format.MetricKindValue
fmt.Println(meta.Name, meta.Version)
var err error
meta, err = loader.SaveMetric(context.Background(), meta, "")
if err != nil {
fmt.Fprintln(os.Stderr, err)
fmt.Println(err)
continue
}
err = storage.Journal().WaitVersion(context.Background(), meta.Version)
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}

0 comments on commit ecf12ab

Please sign in to comment.