Skip to content

Commit

Permalink
fixes CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhang Balkundi committed Mar 25, 2024
1 parent 4c44596 commit d76b6a2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
61 changes: 39 additions & 22 deletions cmd/ziggurat/templates/main.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,63 @@ package main

import (
"context"
"math/rand"
"time"

"github.com/gojekfarm/ziggurat/mw/statsd"

"github.com/gojekfarm/ziggurat"
"github.com/gojekfarm/ziggurat/kafka"
"github.com/gojekfarm/ziggurat/logger"
"github.com/gojekfarm/ziggurat/mw/statsd"
"github.com/gojekfarm/ziggurat/mw/rabbitmq"
)

func main() {
var zig ziggurat.Ziggurat
var r kafka.Router
router := ziggurat.NewRouter()
statsdPub := statsd.NewPublisher(statsd.WithDefaultTags(map[string]string{
"app_name": "{{.AppName}}",
}))
ctx := context.Background()
l := logger.NewLogger(logger.LevelInfo)
ks := kafka.Streams{
StreamConfig: kafka.StreamConfig{
{
BootstrapServers: "localhost:9092",
Topics: "plain-text-log",
GroupID: "{{.AppName}}_consumer",
ConsumerCount: 1,
RouteGroup: "plain-text-group",
},
statsClient := statsd.NewPublisher(statsd.WithLogger(l),
statsd.WithDefaultTags(statsd.StatsDTag{"ziggurat-version": "v162"}),
statsd.WithPrefix("ziggurat_v162"))

kcg := kafka.ConsumerGroup{
Logger: nil,
GroupConfig: kafka.ConsumerConfig{
BootstrapServers: "localhost:9092",
GroupID: "foo.id",
Topics: []string{"foo"},
},
Logger: l,
}

r.HandleFunc("plain-text-group/*", func(ctx context.Context, event *ziggurat.Event) error {
ar := rabbitmq.AutoRetry([]rabbitmq.QueueConfig{
{
QueueKey: "plain_text_messages_retry",
DelayExpirationInMS: "500",
ConsumerPrefetchCount: 1,
ConsumerCount: 1,
RetryCount: 1,
},
}, rabbitmq.WithLogger(l),
rabbitmq.WithUsername("user"),
rabbitmq.WithPassword("bitnami"),
rabbitmq.WithConnectionTimeout(3*time.Second))

router.HandlerFunc("cpool/", func(ctx context.Context, event *ziggurat.Event) error {
if rand.Intn(1000)%2 == 0 {
l.Info("retrying")
err := ar.Retry(ctx, event, "plain_text_messages_retry")
l.Info("retrying finished")
return err
}
return nil
})

h := ziggurat.Use(router, statsClient.PublishEventDelay, statsClient.PublishHandlerMetrics)

zig.StartFunc(func(ctx context.Context) {
err := statsdPub.Run(ctx)
l.Error("statsd publisher error", err)
})

if runErr := zig.RunAll(ctx, &r, &ks); runErr != nil {
if runErr := zig.Run(ctx, h, &kcg); runErr != nil {
l.Error("error running streams", runErr)
}

Expand Down
2 changes: 2 additions & 0 deletions example/sampleapp/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build ignore

package main

import (
Expand Down

0 comments on commit d76b6a2

Please sign in to comment.