Skip to content

Commit

Permalink
Merge pull request #209 from pace/add-queue
Browse files Browse the repository at this point in the history
Add queueing system via rmq
  • Loading branch information
Ferlonas authored Jun 22, 2020
2 parents ad9e947 + 759a133 commit c90ea28
Show file tree
Hide file tree
Showing 101 changed files with 4,884 additions and 153 deletions.
22 changes: 22 additions & 0 deletions backend/queue/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package queue

import (
"log"
"time"

"github.com/caarlos0/env"
)

type config struct {
HealthCheckResultTTL time.Duration `env:"RMQ_HEALTH_CHECK_RESULT_TTL" envDefault:"10s"`
MetricsRefreshInterval time.Duration `env:"RMQ_METRICS_REFRESH_INTERVAL" envDefault:"10s"`
}

var cfg config

func init() {
err := env.Parse(&cfg)
if err != nil {
log.Fatalf("Failed to parse queue environment: %v", err)
}
}
77 changes: 77 additions & 0 deletions backend/queue/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package queue

import (
"context"
"time"

"github.com/adjust/rmq/v2"
"github.com/pace/bricks/maintenance/log"
"github.com/pace/bricks/pkg/routine"
"github.com/prometheus/client_golang/prometheus"
)

type queueStatsGauges struct {
readyGauge *prometheus.GaugeVec
rejectedGauge *prometheus.GaugeVec
connectionGauge *prometheus.GaugeVec
consumerGauge *prometheus.GaugeVec
unackedGauge *prometheus.GaugeVec
}

func gatherMetrics(connection rmq.Connection) {
gauges := registerConnection(connection)
ctx := log.WithContext(context.Background())

routine.Run(ctx, func(_ context.Context) {
stats := connection.CollectStats(connection.GetOpenQueues())
for queue, queueStats := range stats.QueueStats {
labels := prometheus.Labels{
"queue": queue,
}
gauges.readyGauge.With(labels).Set(float64(queueStats.ReadyCount))
gauges.rejectedGauge.With(labels).Set(float64(queueStats.RejectedCount))
gauges.connectionGauge.With(labels).Set(float64(queueStats.ConnectionCount()))
gauges.consumerGauge.With(labels).Set(float64(queueStats.ConsumerCount()))
gauges.unackedGauge.With(labels).Set(float64(queueStats.UnackedCount()))
}
time.Sleep(cfg.MetricsRefreshInterval)
})
}

func registerConnection(connection rmq.Connection) queueStatsGauges {
gauges := queueStatsGauges{
readyGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rmq",
Name: "ready",
Help: "Number of ready messages on queue",
}, []string{"queue"}),
rejectedGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rmq",
Name: "rejected",
Help: "Number of rejected messages on queue",
}, []string{"queue"}),
connectionGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rmq",
Name: "connection",
Help: "Number of connections consuming a queue",
}, []string{"queue"}),
consumerGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rmq",
Name: "consumer",
Help: "Number of consumers consuming messages for a queue",
}, []string{"queue"}),
unackedGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rmq",
Name: "unacked",
Help: "Number of unacked messages on a consumer",
}, []string{"queue"}),
}

prometheus.MustRegister(gauges.readyGauge)
prometheus.MustRegister(gauges.rejectedGauge)
prometheus.MustRegister(gauges.connectionGauge)
prometheus.MustRegister(gauges.consumerGauge)
prometheus.MustRegister(gauges.unackedGauge)

return gauges
}
66 changes: 66 additions & 0 deletions backend/queue/rmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package queue

import (
"context"
"fmt"
"sync"
"time"

"github.com/adjust/rmq/v2"
"github.com/pace/bricks/backend/redis"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
)

var (
rmqConnection rmq.Connection
queueHealthLimits sync.Map
)

func init() {
rmqConnection = rmq.OpenConnectionWithRedisClient("default", redis.Client())
gatherMetrics(rmqConnection)
servicehealthcheck.RegisterHealthCheck("rmq", &HealthCheck{})
}

// NewQueue creates a new rmq.Queue and initializes health checks for this queue
// Whenever the number of items in the queue exceeds the healthyLimit
// The queue will be reported as unhealthy
// If the queue has already been opened, it will just be returned. Limits will not
// be updated
func NewQueue(name string, healthyLimit int) rmq.Queue {
queue := rmqConnection.OpenQueue(name)
if _, ok := queueHealthLimits.Load(name); ok {
return queue
}
queueHealthLimits.Store(name, healthyLimit)
return queue
}

type HealthCheck struct {
state servicehealthcheck.ConnectionState
// IgnoreInterval is a switch used for testing, just to allow multiple
// functional queries of HealthCheck in rapid bursts
IgnoreInterval bool
}

// HealthCheck checks if the queues are healthy, i.e. whether the number of
// items accumulated is below the healthyLimit defined when opening the queue
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
if !h.IgnoreInterval && time.Since(h.state.LastChecked()) <= cfg.HealthCheckResultTTL {
return h.state.GetState()
}

stats := rmqConnection.CollectStats(rmqConnection.GetOpenQueues())
queueHealthLimits.Range(func(k, v interface{}) bool {
name := k.(string)
healthLimit := v.(int)
stat := stats.QueueStats[name]
if stat.ReadyCount > healthLimit {
h.state.SetErrorState(fmt.Errorf("Queue '%s' exceeded safe health limit of '%d'", name, healthLimit))
return false
}
h.state.SetHealthy()
return true
})
return h.state.GetState()
}
31 changes: 31 additions & 0 deletions backend/queue/rmq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package queue_test

import (
"context"
"testing"

"github.com/pace/bricks/backend/queue"
"github.com/pace/bricks/maintenance/log"
)

func TestIntegrationHealthCheck(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := log.WithContext(context.Background())
q1 := queue.NewQueue("integrationTestTasks", 1)
q1.Publish("nothing here")

check := &queue.HealthCheck{IgnoreInterval: true}
res := check.HealthCheck(ctx)
if res.State != "OK" {
t.Errorf("Expected health check to be OK for a non-full queue")
}

q1.Publish("nothing here either")

res = check.HealthCheck(ctx)
if res.State == "OK" {
t.Errorf("Expected health check to be ERR for a full queue")
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/pace/bricks
go 1.14

require (
github.com/adjust/rmq/v2 v2.0.0-20200523123200-98c5e969f342
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/bsm/redislock v0.5.0
Expand Down
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
github.com/adjust/gocheck v0.0.0-20131111155431-fbc315b36e0e h1:eiFUF06iaKUDS3HVFSlRYEL0ddnQ+HAGIis/kENW+Ug=
github.com/adjust/gocheck v0.0.0-20131111155431-fbc315b36e0e/go.mod h1:x8X/algNhAAR28ODU+0TzjBwcr7CHA1F/o27Ov/rFGQ=
github.com/adjust/rmq/v2 v2.0.0-20200523123200-98c5e969f342 h1:B3oELdorwpFJeJeqXEMV8grlXIGeKWmPONdUO2ghH2c=
github.com/adjust/rmq/v2 v2.0.0-20200523123200-98c5e969f342/go.mod h1:fWScdzE1sTK0bCeW2abHbcq24cwiKrbXWnPiYL+ijWM=
github.com/adjust/uniuri v0.0.0-20130923163420-498743145e60 h1:ogL5Ct/E8o3w/QiBWDFJV9fOXglEiXI+YaYIqWNCJ8Y=
github.com/adjust/uniuri v0.0.0-20130923163420-498743145e60/go.mod h1:pgVmNTYfZOWG+PrCVPcvgUy5Z/uowI78tK8ARMsdVXw=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf h1:eg0MeVzsP1G42dRafH3vf+al2vQIJU0YHX+1Tw87oco=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
Expand Down Expand Up @@ -69,8 +75,13 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU=
github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg=
github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down Expand Up @@ -153,6 +164,8 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/p
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e h1:N7DeIrjYszNmSW409R3frPPwglRwMkXSBzwVbkOjLLA=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
Expand Down Expand Up @@ -187,3 +200,5 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=
27 changes: 27 additions & 0 deletions vendor/github.com/adjust/rmq/v2/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/adjust/rmq/v2/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/github.com/adjust/rmq/v2/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c90ea28

Please sign in to comment.