Skip to content

Commit

Permalink
Added more poison queue context (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak authored Dec 17, 2019
1 parent 652a62c commit e886354
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
12 changes: 10 additions & 2 deletions message/router/middleware/poison.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import (
// ErrInvalidPoisonQueueTopic occurs when the topic supplied to the PoisonQueue constructor is invalid.
var ErrInvalidPoisonQueueTopic = errors.New("invalid poison queue topic")

// ReasonForPoisonedKey is the metadata key which marks the reason (error) why the message was deemed poisoned.
var ReasonForPoisonedKey = "reason_poisoned"
// Metadata keys which marks the reason and context why the message was deemed poisoned.
const (
ReasonForPoisonedKey = "reason_poisoned"
PoisonedTopicKey = "topic_poisoned"
PoisonedHandlerKey = "handler_poisoned"
PoisonedSubscriberKey = "subscriber_poisoned"
)

type poisonQueue struct {
topic string
Expand Down Expand Up @@ -61,6 +66,9 @@ func (pq poisonQueue) publishPoisonMessage(msg *message.Message, err error) erro

// add context why it was poisoned
msg.Metadata.Set(ReasonForPoisonedKey, err.Error())
msg.Metadata.Set(PoisonedTopicKey, message.SubscribeTopicFromCtx(msg.Context()))
msg.Metadata.Set(PoisonedHandlerKey, message.HandlerNameFromCtx(msg.Context()))
msg.Metadata.Set(PoisonedSubscriberKey, message.SubscriberNameFromCtx(msg.Context()))

// don't intercept error from publish. Can't help you if the publisher is down as well.
return pq.pub.Publish(pq.topic, msg)
Expand Down
52 changes: 52 additions & 0 deletions message/router/middleware/poison_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package middleware_test

import (
"context"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"testing"
"time"

"github.com/hashicorp/go-multierror"

Expand Down Expand Up @@ -106,6 +111,53 @@ func TestPoisonQueue_handler_failing(t *testing.T) {
}
}

func TestPoisonQueue_context_values(t *testing.T) {
pubSub := gochannel.NewGoChannel(
gochannel.Config{Persistent: true},
watermill.NewStdLogger(true, true),
)

logger := watermill.NewStdLogger(true, true)

router, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

pq, err := middleware.PoisonQueue(pubSub, "poison_queue")
require.NoError(t, err)
router.AddMiddleware(pq)

router.AddNoPublisherHandler("handler_name", "test", pubSub, func(msg *message.Message) error {
return errors.New("error")
})

go func() {
require.NoError(t, router.Run(context.Background()))
}()
require.NoError(t, err)
defer router.Close()

select {
case <-router.Running():
// ok
case <-time.After(time.Second):
t.Fatal("waiting for router timeout")
}

err = pubSub.Publish("test", message.NewMessage("1", nil))
require.NoError(t, err)

msgs, err := pubSub.Subscribe(context.Background(), "poison_queue")
require.NoError(t, err)

messages, all := subscriber.BulkRead(msgs, 1, time.Second)
require.True(t, all, "no messages received")

assert.Equal(t, "handler_name", messages[0].Metadata[middleware.PoisonedHandlerKey])
assert.Equal(t, "gochannel.GoChannel", messages[0].Metadata[middleware.PoisonedSubscriberKey])
assert.Equal(t, "test", messages[0].Metadata[middleware.PoisonedTopicKey])
assert.Equal(t, "error", messages[0].Metadata[middleware.ReasonForPoisonedKey])
}

func TestPoisonQueue_handler_failing_publisher_failing(t *testing.T) {
poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysFail}

Expand Down

0 comments on commit e886354

Please sign in to comment.