Skip to content

Commit

Permalink
Fixing failures caused by using multiple middleware (#196)
Browse files Browse the repository at this point in the history
* Fixing failures caused by using multiple middleware

* refactor(middleware/otel): improve readability of test code by formatting `cron.NewEntry` call

* test(issue-190): add test case for distributed no-overlapping middleware with Redis mutex

* refactor(tests): remove unused import and simplify logging in issue190_test.go
  • Loading branch information
flc1125 authored Jan 22, 2025
1 parent e0e5142 commit 5893ebd
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job, middlewares ...Middleware) E
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
entry := newEntry(c.nextID, schedule, cmd, withEntryMiddlewares(
entry := NewEntry(c.nextID, schedule, cmd, WithEntryMiddlewares(
append(c.middlewares, middlewares...)...),
)
if !c.running {
Expand Down
8 changes: 4 additions & 4 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ type Entry struct {
middlewares []Middleware
}

type entryOption func(*Entry)
type EntryOption func(*Entry)

func withEntryMiddlewares(middlewares ...Middleware) entryOption {
func WithEntryMiddlewares(middlewares ...Middleware) EntryOption {
return func(e *Entry) {
e.middlewares = middlewares
}
}

// newEntry creates a new entry with the given schedule and job.
func newEntry(id EntryID, schedule Schedule, job Job, opts ...entryOption) *Entry {
// NewEntry creates a new entry with the given schedule and job.
func NewEntry(id EntryID, schedule Schedule, job Job, opts ...EntryOption) *Entry {
entry := &Entry{
id: id,
schedule: schedule,
Expand Down
4 changes: 2 additions & 2 deletions entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestEntry_Attributes(t *testing.T) {
entry := newEntry(1, nil, JobFunc(func(context.Context) error {
entry := NewEntry(1, nil, JobFunc(func(context.Context) error {
return nil
}))
assert.Equal(t, entry.ID(), EntryID(1))
Expand Down Expand Up @@ -41,7 +41,7 @@ func TestEntry_Context(t *testing.T) {
assert.Nil(t, entry)

// existent entry
entry = newEntry(tt.id, nil, JobFunc(func(ctx context.Context) error {
entry = NewEntry(tt.id, nil, JobFunc(func(ctx context.Context) error {
entry, ok := EntryFromContext(ctx)
assert.True(t, ok)
assert.Equal(t, entry.ID(), tt.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ func New(mu Mutex, opts ...Option) cron.Middleware {
o := newOptions(mu, opts...)
return func(original cron.Job) cron.Job {
return cron.JobFunc(func(ctx context.Context) error {
job, ok := any(original).(JobWithMutex)
entry, ok := cron.EntryFromContext(ctx)
if !ok {
return original.Run(ctx)
}

// fix: https://github.com/flc1125/go-cron/issues/190
// retrieve original job data
job, ok := any(entry.Job()).(JobWithMutex)
if !ok {
return original.Run(ctx)
}
Expand Down
28 changes: 11 additions & 17 deletions middleware/otel/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ func New(opts ...Option) cron.Middleware {
tracer := o.tp.Tracer(scopeName)
return func(original cron.Job) cron.Job {
return cron.JobFunc(func(ctx context.Context) error {
job, ok := any(original).(JobWithName)
entry, ok := cron.EntryFromContext(ctx)
if !ok {
return original.Run(ctx)
}

job, ok := any(entry.Job()).(JobWithName)
if !ok {
return original.Run(ctx)
}
Expand All @@ -63,10 +68,12 @@ func New(opts ...Option) cron.Middleware {
)
defer span.End()

span.SetAttributes(append(
entryAttributes(ctx),
span.SetAttributes(
attrJobID.Int(int(entry.ID())),
attrJobName.String(job.Name()),
)...)
attrJobPrevTime.String(entry.Prev().String()),
attrJobNextTime.String(entry.Next().String()),
)

err := job.Run(ctx)
if err != nil {
Expand All @@ -78,16 +85,3 @@ func New(opts ...Option) cron.Middleware {
})
}
}

func entryAttributes(ctx context.Context) []attribute.KeyValue {
entry, ok := cron.EntryFromContext(ctx)
if !ok {
return []attribute.KeyValue{}
}

return []attribute.KeyValue{
attrJobID.Int(int(entry.ID())),
attrJobPrevTime.String(entry.Prev().String()),
attrJobNextTime.String(entry.Next().String()),
}
}
9 changes: 8 additions & 1 deletion middleware/otel/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package otel

import (
"context"
"math/rand/v2"
"testing"

"github.com/flc1125/go-cron/v4"
Expand Down Expand Up @@ -61,14 +62,20 @@ func TestTracing(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
defer imsb.Reset()

require.Equal(t, tt.error, middleware(&mockJob{t: t, name: tt.name, err: tt.error}).Run(ctx))
entry := cron.NewEntry(
cron.EntryID(rand.IntN(10)), nil,
&mockJob{t: t, name: tt.name, err: tt.error}, cron.WithEntryMiddlewares(middleware),
)

require.Equal(t, tt.error, entry.WrappedJob().Run(ctx))
require.Len(t, imsb.GetSpans(), 1)

span := imsb.GetSpans()[0]
assert.Equal(t, "cron "+tt.name, span.Name)
assert.NotEmpty(t, span.SpanContext.TraceID())
assert.NotEmpty(t, span.SpanContext.SpanID())
assert.Equal(t, trace.SpanKindInternal, span.SpanKind)
assert.Contains(t, span.Attributes, attribute.Int("cron.job.id", int(entry.ID())))
assert.Contains(t, span.Attributes, attribute.String("cron.job.name", tt.name))
assert.Contains(t, span.Attributes, attribute.String("test.job", tt.name))
tt.extraTesting(t, span)
Expand Down
40 changes: 40 additions & 0 deletions tests/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module github.com/flc1125/go-cron/tests/v4

go 1.22.0

replace (
github.com/flc1125/go-cron/middleware/distributednooverlapping/redismutex/v4 => ../middleware/distributednooverlapping/redismutex
github.com/flc1125/go-cron/middleware/distributednooverlapping/v4 => ../middleware/distributednooverlapping
github.com/flc1125/go-cron/middleware/nooverlapping/v4 => ../middleware/nooverlapping
github.com/flc1125/go-cron/middleware/otel/v4 => ../middleware/otel
github.com/flc1125/go-cron/middleware/recovery/v4 => ../middleware/recovery
github.com/flc1125/go-cron/v4 => ../
)

require (
github.com/flc1125/go-cron/middleware/distributednooverlapping/redismutex/v4 v4.3.2
github.com/flc1125/go-cron/middleware/distributednooverlapping/v4 v4.3.2
github.com/flc1125/go-cron/middleware/nooverlapping/v4 v4.3.2
github.com/flc1125/go-cron/middleware/otel/v4 v4.3.2
github.com/flc1125/go-cron/middleware/recovery/v4 v4.3.2
github.com/flc1125/go-cron/v4 v4.3.2
github.com/redis/go-redis/v9 v9.7.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel/sdk v1.34.0
)

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
50 changes: 50 additions & 0 deletions tests/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/flc1125/go-cron/crontest/v4 v4.3.2 h1:eX01C1ncRPi3o9JWoJCo1g6IiVB5wJL7mAZGdU2IDPo=
github.com/flc1125/go-cron/crontest/v4 v4.3.2/go.mod h1:jUm+tEvRTjYJGwjDlEjZodtigsrWqemI07tp7dR74vA=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
77 changes: 77 additions & 0 deletions tests/issue-190/issue190_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package issue_190 //nolint:revive

import (
"context"
"testing"
"time"

"github.com/flc1125/go-cron/middleware/distributednooverlapping/redismutex/v4"
"github.com/flc1125/go-cron/middleware/distributednooverlapping/v4"
"github.com/flc1125/go-cron/middleware/nooverlapping/v4"
"github.com/flc1125/go-cron/middleware/otel/v4"
"github.com/flc1125/go-cron/middleware/recovery/v4"
"github.com/flc1125/go-cron/v4"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

type exampleJob struct {
t *testing.T
name string
}

var (
_ cron.Job = (*exampleJob)(nil)
_ distributednooverlapping.JobWithMutex = (*exampleJob)(nil)
_ otel.JobWithName = (*exampleJob)(nil)
)

func (j exampleJob) Run(context.Context) error {
time.Sleep(2 * time.Second)
j.t.Logf("job %s is running", j.name)
return nil
}

func (j exampleJob) Name() string {
return j.name
}

func (j exampleJob) GetMutexKey() string {
return j.Name()
}

func (j exampleJob) GetMutexTTL() time.Duration {
return time.Minute
}

func TestIssue190(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.FlushAll(context.Background())

mutex := redismutex.New(rdb, redismutex.WithPrefix("cron"))
imsb := tracetest.NewInMemoryExporter()
provider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(imsb))

c := cron.New(
cron.WithSeconds(),
cron.WithMiddleware(
recovery.New(),
nooverlapping.New(),
distributednooverlapping.New(mutex),
otel.New(otel.WithTracerProvider(provider)),
),
)

_, _ = c.AddJob("* * * * * *", &exampleJob{t: t, name: "job1"})

c.Start()
defer c.Stop()

time.Sleep(6 * time.Second)

assert.LessOrEqual(t, len(imsb.GetSpans()), 3)
}

0 comments on commit 5893ebd

Please sign in to comment.