Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve refs garbage collection for redis Redis using TTL #983

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"
"time"

"github.com/creasty/defaults"
"github.com/go-playground/validator/v10"
Expand Down Expand Up @@ -131,6 +132,10 @@ type Redis struct {
// URL used to connect onto the redis endpoint
// format: redis[s]://[:password@]host[:port][/db-number][?option=value])
URL string `yaml:"url"`

ProjectTTL time.Duration `default:"168h" yaml:"project_ttl"`
RefTTL time.Duration `default:"1h" yaml:"ref_ttl"`
MetricTTL time.Duration `default:"1h" yaml:"metric_ttl"`
}

// Pull ..
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -72,6 +73,10 @@ func TestNew(t *testing.T) {
c.ProjectDefaults.Pull.Pipeline.Jobs.RunnerDescription.AggregationRegexp = `shared-runners-manager-(\d*)\.gitlab\.com`
c.ProjectDefaults.Pull.Pipeline.Variables.Regexp = `.*`

c.Redis.ProjectTTL = 168 * time.Hour
c.Redis.RefTTL = 1 * time.Hour
c.Redis.MetricTTL = 1 * time.Hour

assert.Equal(t, c, New())
}

Expand Down
19 changes: 14 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,23 @@ func New(ctx context.Context, cfg config.Config, version string) (c Controller,
return
}

if err = c.configureRedis(ctx, cfg.Redis.URL); err != nil {
if err = c.configureRedis(ctx, &cfg.Redis); err != nil {
return
}

c.TaskController = NewTaskController(ctx, c.Redis, cfg.Gitlab.MaximumJobsQueueSize)
c.registerTasks()

c.Store = store.New(ctx, c.Redis, c.Config.Projects)
var redisStore *store.Redis
if c.Redis != nil {
redisStore = store.NewRedisStore(c.Redis, store.WithTTLConfig(&store.RedisTTLConfig{
Project: cfg.Redis.ProjectTTL,
Ref: cfg.Redis.RefTTL,
Metric: cfg.Redis.MetricTTL,
}))
}

c.Store = store.New(ctx, redisStore, c.Config.Projects)

if err = c.configureGitlab(cfg.Gitlab, version); err != nil {
return
Expand Down Expand Up @@ -170,11 +179,11 @@ func (c *Controller) configureGitlab(cfg config.Gitlab, version string) (err err
return
}

func (c *Controller) configureRedis(ctx context.Context, url string) (err error) {
func (c *Controller) configureRedis(ctx context.Context, config *config.Redis) (err error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "controller:configureRedis")
defer span.End()

if len(url) <= 0 {
if len(config.URL) <= 0 {
log.Debug("redis url is not configured, skipping configuration & using local driver")

return
Expand All @@ -184,7 +193,7 @@ func (c *Controller) configureRedis(ctx context.Context, url string) (err error)

var opt *redis.Options

if opt, err = redis.ParseURL(url); err != nil {
if opt, err = redis.ParseURL(config.URL); err != nil {
return
}

Expand Down
62 changes: 41 additions & 21 deletions pkg/controller/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,20 @@ func (c *Controller) GarbageCollectRefs(ctx context.Context) error {
return err
}

storedRefsLen := len(storedRefs)
var i int
for _, ref := range storedRefs {
i++
log.WithFields(log.Fields{"progress": i, "total": storedRefsLen}).Debug("ongoing 'refs' garbage collection")
if c.Store.HasRefExpired(ctx, ref.Key()) {
if err = deleteRef(ctx, c.Store, ref, "expired"); err != nil {
return err
}

continue
}

// Check Project Still Exist
projectExists, err := c.Store.ProjectExists(ctx, ref.Project.Key())
if err != nil {
return err
Expand Down Expand Up @@ -275,23 +288,28 @@ func (c *Controller) GarbageCollectMetrics(ctx context.Context) error {
return err
}

storedMetricsLen := len(storedMetrics)
var i int
for k, m := range storedMetrics {
i++
log.WithFields(log.Fields{"progress": i, "total": storedMetricsLen}).Debug("ongoing 'metrics' garbage collection")
if c.Store.HasMetricExpired(ctx, m.Key()) {
if err = deleteMetric(ctx, c.Store, m, "expired"); err != nil {
return err
}

continue
}
// In order to save some memory space we chose to have to recompose
// the Ref the metric belongs to
metricLabelProject, metricLabelProjectExists := m.Labels["project"]
metricLabelRef, metricLabelRefExists := m.Labels["ref"]
metricLabelEnvironment, metricLabelEnvironmentExists := m.Labels["environment"]

if !metricLabelProjectExists || (!metricLabelRefExists && !metricLabelEnvironmentExists) {
if err = c.Store.DelMetric(ctx, k); err != nil {
if err = deleteMetric(ctx, c.Store, m, "project-or-ref-and-environment-label-undefined"); err != nil {
return err
}

log.WithFields(log.Fields{
"metric-kind": m.Kind,
"metric-labels": m.Labels,
"reason": "project-or-ref-and-environment-label-undefined",
}).Info("deleted metric from the store")
}

if metricLabelRefExists && !metricLabelEnvironmentExists {
Expand Down Expand Up @@ -374,33 +392,21 @@ func (c *Controller) GarbageCollectMetrics(ctx context.Context) error {

// If the ref does not exist anymore, delete the metric
if !envExists {
if err = c.Store.DelMetric(ctx, k); err != nil {
if err = deleteMetric(ctx, c.Store, m, "non-existent-environment"); err != nil {
return err
}

log.WithFields(log.Fields{
"metric-kind": m.Kind,
"metric-labels": m.Labels,
"reason": "non-existent-environment",
}).Info("deleted metric from the store")

continue
}

// Check if 'output sparse statuses metrics' has been enabled
switch m.Kind {
case schemas.MetricKindEnvironmentDeploymentStatus:
if env.OutputSparseStatusMetrics && m.Value != 1 {
if err = c.Store.DelMetric(ctx, k); err != nil {
if err = deleteMetric(ctx, c.Store, m, "output-sparse-metrics-enabled-on-environment"); err != nil {
return err
}

log.WithFields(log.Fields{
"metric-kind": m.Kind,
"metric-labels": m.Labels,
"reason": "output-sparse-metrics-enabled-on-environment",
}).Info("deleted metric from the store")

continue
}
}
Expand Down Expand Up @@ -438,3 +444,17 @@ func deleteRef(ctx context.Context, s store.Store, ref schemas.Ref, reason strin

return
}

func deleteMetric(ctx context.Context, s store.Store, m schemas.Metric, reason string) (err error) {
if err = s.DelMetric(ctx, m.Key()); err != nil {
return
}

log.WithFields(log.Fields{
"metric-kind": m.Kind,
"metric-labels": m.Labels,
"reason": reason,
}).Info("deleted metric from the store")

return
}
15 changes: 15 additions & 0 deletions pkg/controller/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ func (c *Controller) PullRefMetrics(ctx context.Context, ref schemas.Ref) error
return fmt.Errorf("error fetching project pipelines for %s: %v", ref.Project.Name, err)
}

if len(pipelines) == 0 && ref.Kind == schemas.RefKindMergeRequest {
refName = fmt.Sprintf("refs/merge-requests/%s/merge", ref.Name)
pipelines, _, err = c.Gitlab.GetProjectPipelines(ctx, ref.Project.Name, &goGitlab.ListProjectPipelinesOptions{
// We only need the most recent pipeline
ListOptions: goGitlab.ListOptions{
PerPage: 1,
Page: 1,
},
Ref: &refName,
})
if err != nil {
return fmt.Errorf("error fetching project pipelines for %s: %v", ref.Project.Name, err)
}
}

if len(pipelines) == 0 {
log.WithFields(logFields).Debug("could not find any pipeline for the ref")

Expand Down
16 changes: 14 additions & 2 deletions pkg/controller/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,28 @@ func (c *Controller) processMergeEvent(ctx context.Context, e goGitlab.MergeEven

switch e.ObjectAttributes.Action {
case "close":
_ = deleteRef(ctx, c.Store, ref, "received merge request close event from webhook")
c.triggerRefDeletion(ctx, ref)
case "merge":
_ = deleteRef(ctx, c.Store, ref, "received merge request merge event from webhook")
c.triggerRefDeletion(ctx, ref)
default:
log.
WithField("merge-request-event-type", e.ObjectAttributes.Action).
Debug("received a non supported merge-request event type as a webhook")
}
}

func (c *Controller) triggerRefDeletion(ctx context.Context, ref schemas.Ref) {
err := c.Store.DelRef(ctx, ref.Key())
if err != nil {
log.WithContext(ctx).
WithFields(log.Fields{
"project-name": ref.Project.Name,
"ref": ref.Name,
}).
Error("failed deleting ref")
}
}

func (c *Controller) triggerRefMetricsPull(ctx context.Context, ref schemas.Ref) {
logFields := log.Fields{
"project-name": ref.Project.Name,
Expand Down
2 changes: 1 addition & 1 deletion pkg/schemas/ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
mergeRequestRegexp string = `^((\d+)|refs/merge-requests/(\d+)/head)$`
mergeRequestRegexp string = `^((\d+)|refs/merge-requests/(\d+)/(?:head|merge))$`

// RefKindBranch refers to a branch.
RefKindBranch RefKind = "branch"
Expand Down
15 changes: 15 additions & 0 deletions pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ type Local struct {
executedTasksCount uint64
}

// HasProjectExpired ..
func (l *Local) HasProjectExpired(ctx context.Context, key schemas.ProjectKey) bool {
return false
}

// HasRefExpired ..
func (l *Local) HasRefExpired(ctx context.Context, key schemas.RefKey) bool {
return false
}

// HasMetricExpired ..
func (l *Local) HasMetricExpired(ctx context.Context, key schemas.MetricKey) bool {
return false
}

// SetProject ..
func (l *Local) SetProject(_ context.Context, p schemas.Project) error {
l.projectsMutex.Lock()
Expand Down
84 changes: 84 additions & 0 deletions pkg/store/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,29 @@ const (
// Redis ..
type Redis struct {
*redis.Client
StoreConfig *RedisStoreConfig
}

// RedisStoreConfig allows to fine tune the store behaviour.
type RedisStoreConfig struct {
TTLConfig *RedisTTLConfig
}

// RedisTTLConfig allows to set the TTL values for the various fields tracked.
type RedisTTLConfig struct {
Project time.Duration
Ref time.Duration
Metric time.Duration
}

func WithTTLConfig(opt *RedisTTLConfig) func(*RedisStoreConfig) {
return func(cfg *RedisStoreConfig) {
cfg.TTLConfig = opt
}
}

type RedisStoreOptions func(opts *RedisStoreConfig)

// SetProject ..
func (r *Redis) SetProject(ctx context.Context, p schemas.Project) error {
marshalledProject, err := msgpack.Marshal(p)
Expand All @@ -35,6 +56,13 @@ func (r *Redis) SetProject(ctx context.Context, p schemas.Project) error {
}

_, err = r.HSet(ctx, redisProjectsKey, string(p.Key()), marshalledProject).Result()
if err != nil {
return err
}

if r.StoreConfig.TTLConfig != nil {
_, err = r.Set(ctx, getTTLProjectKey(p.Key()), true, r.StoreConfig.TTLConfig.Project).Result()
}

return err
}
Expand Down Expand Up @@ -183,6 +211,13 @@ func (r *Redis) SetRef(ctx context.Context, ref schemas.Ref) error {
}

_, err = r.HSet(ctx, redisRefsKey, string(ref.Key()), marshalledRef).Result()
if err != nil {
return err
}

if r.StoreConfig.TTLConfig != nil {
_, err = r.Set(ctx, getTTLRefKey(ref.Key()), true, r.StoreConfig.TTLConfig.Ref).Result()
}

return err
}
Expand Down Expand Up @@ -257,6 +292,13 @@ func (r *Redis) SetMetric(ctx context.Context, m schemas.Metric) error {
}

_, err = r.HSet(ctx, redisMetricsKey, string(m.Key()), marshalledMetric).Result()
if err != nil {
return err
}

if r.StoreConfig.TTLConfig != nil {
_, err = r.Set(ctx, getTTLMetricKey(m.Key()), true, r.StoreConfig.TTLConfig.Metric).Result()
}

return err
}
Expand Down Expand Up @@ -418,3 +460,45 @@ func (r *Redis) ExecutedTasksCount(ctx context.Context) (uint64, error) {

return uint64(c), err
}

// HasProjectExpired ..
func (r *Redis) HasProjectExpired(ctx context.Context, key schemas.ProjectKey) bool {
reply, err := r.Exists(ctx, getTTLProjectKey(key)).Result()
if err != nil {
return false
}

return reply > 0
}

func getTTLProjectKey(key schemas.ProjectKey) string {
return fmt.Sprintf("%s:%s", redisProjectsKey, key)
}

// HasRefExpired ..
func (r *Redis) HasRefExpired(ctx context.Context, key schemas.RefKey) bool {
reply, err := r.Exists(ctx, getTTLRefKey(key)).Result()
if err != nil {
return false
}

return reply > 0
}

func getTTLRefKey(key schemas.RefKey) string {
return fmt.Sprintf("%s:%s", redisRefsKey, key)
}

// HasMetricExpired ..
func (r *Redis) HasMetricExpired(ctx context.Context, key schemas.MetricKey) bool {
reply, err := r.Exists(ctx, getTTLMetricKey(key)).Result()
if err != nil {
return false
}

return reply > 0
}

func getTTLMetricKey(key schemas.MetricKey) string {
return fmt.Sprintf("%s:%s", redisMetricsKey, key)
}
Loading