Skip to content

Commit

Permalink
allows for zero consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang.balkundi committed Oct 25, 2023
1 parent d44e93e commit 462380a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 26 deletions.
8 changes: 4 additions & 4 deletions example/sampleapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func main() {

ks := kafka.Streams{
StreamConfig: kafka.StreamConfig{{
BootstrapServers: "g-gojek-id-mainstream.golabs.io:6668",
Topics: "ziggurat_channel_pool_test",
BootstrapServers: "localhost:9092",
Topics: "json-log",
GroupID: "ziggurat_consumer_local",
ConsumerCount: 2,
RouteGroup: "cpool"}},
Expand Down Expand Up @@ -59,7 +59,7 @@ func main() {
l.Info("retrying finished")
return err
}
return ar.Publish(ctx, event, "bulk_cons", rabbitmq.QueueTypeWorker, "")
return ar.SendToWorker(ctx, event, "bulk_cons")

})

Expand All @@ -69,7 +69,7 @@ func main() {

h := ziggurat.Use(&r, statsClient.PublishEventDelay, statsClient.PublishHandlerMetrics)

if runErr := zig.RunAll(ctx, h, &ks); runErr != nil {
if runErr := zig.RunAll(ctx, h, &ks, ar); runErr != nil {
l.Error("error running streams", runErr)
}

Expand Down
1 change: 0 additions & 1 deletion mw/rabbitmq/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ var getChannelFromDialer = func(ctx context.Context, d *amqpextra.Dialer, timeou
<-done
cfn()
}()

conn, err := d.Connection(timeoutCtx)
if err != nil {
return nil, err
Expand Down
18 changes: 10 additions & 8 deletions mw/rabbitmq/int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ func Test_RetryFlow(t *testing.T) {

cases := []test{
{
PublishCount: 20,
RetryCount: 5,
Name: "handler is called for PublishCount * RetryCount times",
QueueName: "foo",
PublishCount: 20,
RetryCount: 5,
Name: "handler is called for PublishCount * RetryCount times",
QueueName: "foo",
ConsumerCount: 1,
},
{
PublishCount: 10,
RetryCount: 5,
Name: "spawns one consumer when the count is 0",
QueueName: "bar",
PublishCount: 10,
RetryCount: 5,
Name: "spawns one consumer when the count is 0",
QueueName: "bar",
ConsumerCount: 1,
},
{
PublishCount: 10,
Expand Down
42 changes: 29 additions & 13 deletions mw/rabbitmq/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func AutoRetry(qc Queues, opts ...Opts) *ARetry {

for _, c := range qc {

if c.ConsumerCount < 1 {
c.ConsumerCount = 1
}
// we allow for zero consumers so that they can be run
// separately
r.queueConfig[c.QueueKey] = c
}

Expand Down Expand Up @@ -149,6 +148,10 @@ func (r *ARetry) Retry(ctx context.Context, event *ziggurat.Event, queueKey stri
return r.publish(ctx, event, queueKey)
}

func (r *ARetry) SendToWorker(ctx context.Context, event *ziggurat.Event, queueKey string) error {
return r.Publish(ctx, event, queueKey, QueueTypeWorker, "")
}

func (r *ARetry) Wrap(f ziggurat.HandlerFunc, queueKey string) ziggurat.HandlerFunc {
hf := func(ctx context.Context, event *ziggurat.Event) error {
// start the publishers once only
Expand All @@ -172,7 +175,7 @@ func (r *ARetry) Wrap(f ziggurat.HandlerFunc, queueKey string) ziggurat.HandlerF
return hf
}

func (r *ARetry) InitPublishers(ctx context.Context) error {
func (r *ARetry) initPubPool(ctx context.Context) error {
dialer, err := newDialer(ctx, r.amqpURLs, r.logger)
if err != nil {
return err
Expand All @@ -182,18 +185,21 @@ func (r *ARetry) InitPublishers(ctx context.Context) error {
if err != nil {
return err
}
return nil
}

ch, err := getChannelFromDialer(ctx, r.publishDialer, r.connTimeout)
func (r *ARetry) initQueues(ctx context.Context, d *amqpextra.Dialer) error {
ch, err := getChannelFromDialer(ctx, d, r.connTimeout)
if err != nil {
return err
}

for _, qc := range r.queueConfig {

if qc.Type == WorkerQueue {
if err := createWorkerQueue(ch, qc.QueueKey, r.ogLogger); err != nil {
return fmt.Errorf("error iniitializing publishers:%w", err)
}
continue
}

if err := createQueuesAndExchanges(ch, qc.QueueKey, r.ogLogger); err != nil {
Expand All @@ -206,6 +212,15 @@ func (r *ARetry) InitPublishers(ctx context.Context) error {
return nil
}

func (r *ARetry) InitPublishers(ctx context.Context) error {
err := r.initPubPool(ctx)
if err != nil {
return err
}

return r.initQueues(ctx, r.publishDialer)
}

func (r *ARetry) Stream(ctx context.Context, h ziggurat.Handler) error {
dialer, err := newDialer(ctx, r.amqpURLs, r.logger)
if err != nil {
Expand All @@ -218,18 +233,19 @@ func (r *ARetry) Stream(ctx context.Context, h ziggurat.Handler) error {
return err
}

// twice called
for _, qc := range r.queueConfig {
if err := createQueuesAndExchanges(ch, qc.QueueKey, r.ogLogger); err != nil {
r.ogLogger.Error("error creating queues and exchanges", err)
return fmt.Errorf("error iniitializing publishers:%w", err)
}
}
err = ch.Close()
r.ogLogger.Error("error closing channel", err)
r.once.Do(func() {
r.ogLogger.Info("[amqp] init from stream")
err := r.initQueues(ctx, r.consumeDialer)
if err != nil {
panic(fmt.Sprintf("could not start RabbitMQ publishers:%v", err))
}
})

var wg sync.WaitGroup
for _, qc := range r.queueConfig {
r.ogLogger.Info("starting consumer for", map[string]interface{}{"name": qc.QueueKey})
for i := 0; i < qc.ConsumerCount; i++ {
wg.Add(1)
go func(qc QueueConfig) {
Expand Down

0 comments on commit 462380a

Please sign in to comment.