From e88635446b74789f5c591b0f089fec7f6d6bff72 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Tue, 17 Dec 2019 13:19:46 +0100 Subject: [PATCH] Added more poison queue context (#170) --- message/router/middleware/poison.go | 12 +++++- message/router/middleware/poison_test.go | 52 ++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/message/router/middleware/poison.go b/message/router/middleware/poison.go index 0185f5f9d..dd8c0e353 100644 --- a/message/router/middleware/poison.go +++ b/message/router/middleware/poison.go @@ -9,8 +9,13 @@ import ( // ErrInvalidPoisonQueueTopic occurs when the topic supplied to the PoisonQueue constructor is invalid. var ErrInvalidPoisonQueueTopic = errors.New("invalid poison queue topic") -// ReasonForPoisonedKey is the metadata key which marks the reason (error) why the message was deemed poisoned. -var ReasonForPoisonedKey = "reason_poisoned" +// Metadata keys which marks the reason and context why the message was deemed poisoned. +const ( + ReasonForPoisonedKey = "reason_poisoned" + PoisonedTopicKey = "topic_poisoned" + PoisonedHandlerKey = "handler_poisoned" + PoisonedSubscriberKey = "subscriber_poisoned" +) type poisonQueue struct { topic string @@ -61,6 +66,9 @@ func (pq poisonQueue) publishPoisonMessage(msg *message.Message, err error) erro // add context why it was poisoned msg.Metadata.Set(ReasonForPoisonedKey, err.Error()) + msg.Metadata.Set(PoisonedTopicKey, message.SubscribeTopicFromCtx(msg.Context())) + msg.Metadata.Set(PoisonedHandlerKey, message.HandlerNameFromCtx(msg.Context())) + msg.Metadata.Set(PoisonedSubscriberKey, message.SubscriberNameFromCtx(msg.Context())) // don't intercept error from publish. Can't help you if the publisher is down as well. return pq.pub.Publish(pq.topic, msg) diff --git a/message/router/middleware/poison_test.go b/message/router/middleware/poison_test.go index d2cac562f..ffe67767b 100644 --- a/message/router/middleware/poison_test.go +++ b/message/router/middleware/poison_test.go @@ -1,7 +1,12 @@ package middleware_test import ( + "context" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message/subscriber" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" "testing" + "time" "github.com/hashicorp/go-multierror" @@ -106,6 +111,53 @@ func TestPoisonQueue_handler_failing(t *testing.T) { } } +func TestPoisonQueue_context_values(t *testing.T) { + pubSub := gochannel.NewGoChannel( + gochannel.Config{Persistent: true}, + watermill.NewStdLogger(true, true), + ) + + logger := watermill.NewStdLogger(true, true) + + router, err := message.NewRouter(message.RouterConfig{}, logger) + require.NoError(t, err) + + pq, err := middleware.PoisonQueue(pubSub, "poison_queue") + require.NoError(t, err) + router.AddMiddleware(pq) + + router.AddNoPublisherHandler("handler_name", "test", pubSub, func(msg *message.Message) error { + return errors.New("error") + }) + + go func() { + require.NoError(t, router.Run(context.Background())) + }() + require.NoError(t, err) + defer router.Close() + + select { + case <-router.Running(): + // ok + case <-time.After(time.Second): + t.Fatal("waiting for router timeout") + } + + err = pubSub.Publish("test", message.NewMessage("1", nil)) + require.NoError(t, err) + + msgs, err := pubSub.Subscribe(context.Background(), "poison_queue") + require.NoError(t, err) + + messages, all := subscriber.BulkRead(msgs, 1, time.Second) + require.True(t, all, "no messages received") + + assert.Equal(t, "handler_name", messages[0].Metadata[middleware.PoisonedHandlerKey]) + assert.Equal(t, "gochannel.GoChannel", messages[0].Metadata[middleware.PoisonedSubscriberKey]) + assert.Equal(t, "test", messages[0].Metadata[middleware.PoisonedTopicKey]) + assert.Equal(t, "error", messages[0].Metadata[middleware.ReasonForPoisonedKey]) +} + func TestPoisonQueue_handler_failing_publisher_failing(t *testing.T) { poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysFail}