diff --git a/UPGRADE-1.0.md b/UPGRADE-1.0.md index 9f669054d..fd5ee62b4 100644 --- a/UPGRADE-1.0.md +++ b/UPGRADE-1.0.md @@ -3,3 +3,4 @@ Migrating Pub/Subs find . -type f -iname '*.go' -exec sed -i -E "s/github\.com\/ThreeDotsLabs\/watermill\/message\/infrastructure\/(amqp|googlecloud|http|io|kafka|nats|sql)/github.com\/ThreeDotsLabs\/watermill-\1\/pkg\/\1/" "{}" +; + find . -type f -iname '*.go' -exec sed -i -E "s/github\.com\/ThreeDotsLabs\/watermill\/message\/infrastructure\/gochannel/github\.com\/ThreeDotsLabs\/watermill\/pubsub\/gochannel/" "{}" +; diff --git a/_examples/metrics/main.go b/_examples/metrics/main.go index c799bbf4a..f412cc6df 100644 --- a/_examples/metrics/main.go +++ b/_examples/metrics/main.go @@ -11,9 +11,9 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/components/metrics" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/ThreeDotsLabs/watermill/message/router/plugin" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) var ( diff --git a/components/cqrs/cqrs_test.go b/components/cqrs/cqrs_test.go index a1508fb31..1589dca2e 100644 --- a/components/cqrs/cqrs_test.go +++ b/components/cqrs/cqrs_test.go @@ -12,7 +12,7 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) // TestCQRS is functional test of CQRS command handler and event handler. diff --git a/docs/content/docs/getting-started/go-channel/main.go b/docs/content/docs/getting-started/go-channel/main.go index 10f85ca99..0ef1f85a5 100644 --- a/docs/content/docs/getting-started/go-channel/main.go +++ b/docs/content/docs/getting-started/go-channel/main.go @@ -9,7 +9,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) func main() { diff --git a/docs/content/docs/getting-started/router/main.go b/docs/content/docs/getting-started/router/main.go index 597fca716..2b7c6acea 100644 --- a/docs/content/docs/getting-started/router/main.go +++ b/docs/content/docs/getting-started/router/main.go @@ -9,9 +9,9 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/ThreeDotsLabs/watermill/message/router/plugin" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) var ( diff --git a/docs/content/docs/pub-sub.md b/docs/content/docs/pub-sub.md index bd415ad14..f35a9a450 100644 --- a/docs/content/docs/pub-sub.md +++ b/docs/content/docs/pub-sub.md @@ -62,7 +62,7 @@ Every Pub/Sub is similar in most aspects. To avoid implementing separate tests for every Pub/Sub, we've created a test suite which should be passed by any Pub/Sub implementation. -These tests can be found in `message/infrastructure/test_pubsub.go`. +These tests can be found in `pubsub/tests/test_pubsub.go`. ### Built-in implementations diff --git a/message/decorator_test.go b/message/decorator_test.go index 175e43ae3..1025f6163 100644 --- a/message/decorator_test.go +++ b/message/decorator_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill/message/infrastructure" + "github.com/ThreeDotsLabs/watermill/pubsub/tests" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -14,8 +14,8 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" "github.com/ThreeDotsLabs/watermill/message/subscriber" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) var noop = func(*message.Message) {} @@ -124,7 +124,7 @@ func TestMessageTransformSubscriberDecorator_Subscribe(t *testing.T) { received, all := subscriber.BulkRead(messages, numMessages, time.Second) require.True(t, all) - infrastructure.AssertAllMessagesReceived(t, sent, received) + tests.AssertAllMessagesReceived(t, sent, received) for _, msg := range received { assert.Equal( diff --git a/message/router.go b/message/router.go index 5e0ddbbdc..cf734a56f 100644 --- a/message/router.go +++ b/message/router.go @@ -10,7 +10,7 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/internal" - sync_internal "github.com/ThreeDotsLabs/watermill/message/infrastructure/sync" + sync_internal "github.com/ThreeDotsLabs/watermill/pubsub/sync" ) var ( diff --git a/message/router_test.go b/message/router_test.go index 93ab0f123..ab54d3329 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill/message/infrastructure" + "github.com/ThreeDotsLabs/watermill/pubsub/tests" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,8 +15,8 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/internal" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" "github.com/ThreeDotsLabs/watermill/message/subscriber" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) func TestRouter_functional(t *testing.T) { @@ -105,14 +105,14 @@ func TestRouter_functional(t *testing.T) { receivedMessages1, all := subscriber.BulkRead(receivedMessagesCh1, len(expectedReceivedMessages), time.Second*10) assert.True(t, all) - infrastructure.AssertAllMessagesReceived(t, expectedReceivedMessages, receivedMessages1) + tests.AssertAllMessagesReceived(t, expectedReceivedMessages, receivedMessages1) receivedMessages2, all := subscriber.BulkRead(receivedMessagesCh2, len(expectedReceivedMessages), time.Second*10) assert.True(t, all) - infrastructure.AssertAllMessagesReceived(t, expectedReceivedMessages, receivedMessages2) + tests.AssertAllMessagesReceived(t, expectedReceivedMessages, receivedMessages2) <-allPublishedByHandler - infrastructure.AssertAllMessagesReceived(t, expectedSentByHandler, publishedByHandler) + tests.AssertAllMessagesReceived(t, expectedSentByHandler, publishedByHandler) } func TestRouter_functional_nack(t *testing.T) { @@ -162,7 +162,7 @@ func TestRouter_functional_nack(t *testing.T) { messages, all := subscriber.BulkRead(messageReceived, 2, time.Second) assert.True(t, all, "not all messages received, probably not ack received, received %d", len(messages)) - infrastructure.AssertAllMessagesReceived(t, []*message.Message{publishedMsg, publishedMsg}, messages) + tests.AssertAllMessagesReceived(t, []*message.Message{publishedMsg, publishedMsg}, messages) } func TestRouter_stop_when_all_handlers_stopped(t *testing.T) { diff --git a/message/subscriber/read_test.go b/message/subscriber/read_test.go index df63861f3..40bb8c6a5 100644 --- a/message/subscriber/read_test.go +++ b/message/subscriber/read_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill/message/infrastructure" + "github.com/ThreeDotsLabs/watermill/pubsub/tests" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" @@ -46,7 +46,7 @@ func TestBulkRead(t *testing.T) { readMessages, all := subscriber.BulkRead(messagesCh, messagesCount, time.Second) assert.True(t, all) - infrastructure.AssertAllMessagesReceived(t, messages, readMessages) + tests.AssertAllMessagesReceived(t, messages, readMessages) }) } } diff --git a/message/infrastructure/doc.go b/pubsub/doc.go similarity index 90% rename from message/infrastructure/doc.go rename to pubsub/doc.go index 290e9395a..a0d120609 100644 --- a/message/infrastructure/doc.go +++ b/pubsub/doc.go @@ -3,4 +3,4 @@ // Detailed Pub/Subs docs: https://watermill.io/docs/pub-sub-implementations/ // Getting started guide: https://watermill.io/docs/getting-started/ -package infrastructure +package pubsub diff --git a/pubsub/gochannel/doc.go b/pubsub/gochannel/doc.go new file mode 100644 index 000000000..ffc39d4ef --- /dev/null +++ b/pubsub/gochannel/doc.go @@ -0,0 +1,5 @@ +// This is just the simplest Pub/Sub implementation +// +// All Pub/Sub implementations can be found at https://watermill.io/docs/pub-sub-implementations/ + +package gochannel diff --git a/message/infrastructure/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go similarity index 100% rename from message/infrastructure/gochannel/pubsub.go rename to pubsub/gochannel/pubsub.go diff --git a/message/infrastructure/gochannel/pubsub_bench_test.go b/pubsub/gochannel/pubsub_bench_test.go similarity index 64% rename from message/infrastructure/gochannel/pubsub_bench_test.go rename to pubsub/gochannel/pubsub_bench_test.go index cea55a0dc..9d0bc56ee 100644 --- a/message/infrastructure/gochannel/pubsub_bench_test.go +++ b/pubsub/gochannel/pubsub_bench_test.go @@ -3,14 +3,16 @@ package gochannel_test import ( "testing" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + + "github.com/ThreeDotsLabs/watermill/pubsub/tests" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" ) func BenchmarkSubscriber(b *testing.B) { - infrastructure.BenchSubscriber(b, func(n int) (message.Publisher, message.Subscriber) { + tests.BenchSubscriber(b, func(n int) (message.Publisher, message.Subscriber) { pubSub := gochannel.NewGoChannel( gochannel.Config{OutputChannelBuffer: int64(n)}, watermill.NopLogger{}, ) @@ -19,7 +21,7 @@ func BenchmarkSubscriber(b *testing.B) { } func BenchmarkSubscriberPersistent(b *testing.B) { - infrastructure.BenchSubscriber(b, func(n int) (message.Publisher, message.Subscriber) { + tests.BenchSubscriber(b, func(n int) (message.Publisher, message.Subscriber) { pubSub := gochannel.NewGoChannel( gochannel.Config{ OutputChannelBuffer: int64(n), diff --git a/message/infrastructure/gochannel/pubsub_stress_test.go b/pubsub/gochannel/pubsub_stress_test.go similarity index 72% rename from message/infrastructure/gochannel/pubsub_stress_test.go rename to pubsub/gochannel/pubsub_stress_test.go index 346603a29..efb04744f 100644 --- a/message/infrastructure/gochannel/pubsub_stress_test.go +++ b/pubsub/gochannel/pubsub_stress_test.go @@ -5,13 +5,13 @@ package gochannel_test import ( "testing" - "github.com/ThreeDotsLabs/watermill/message/infrastructure" + "github.com/ThreeDotsLabs/watermill/pubsub/tests" ) func TestPublishSubscribe_stress(t *testing.T) { - infrastructure.TestPubSubStressTest( + tests.TestPubSubStressTest( t, - infrastructure.Features{ + tests.Features{ ConsumerGroups: false, ExactlyOnceDelivery: true, GuaranteedOrder: false, diff --git a/message/infrastructure/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go similarity index 90% rename from message/infrastructure/gochannel/pubsub_test.go rename to pubsub/gochannel/pubsub_test.go index 81bef5584..b47e8438b 100644 --- a/message/infrastructure/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -8,12 +8,14 @@ import ( "testing" "time" + "github.com/ThreeDotsLabs/watermill/pubsub/tests" + + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/stretchr/testify/assert" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel" "github.com/ThreeDotsLabs/watermill/message/subscriber" "github.com/stretchr/testify/require" ) @@ -30,9 +32,9 @@ func createPersistentPubSub(t *testing.T) (message.Publisher, message.Subscriber } func TestPublishSubscribe_persistent(t *testing.T) { - infrastructure.TestPubSub( + tests.TestPubSub( t, - infrastructure.Features{ + tests.Features{ ConsumerGroups: false, ExactlyOnceDelivery: true, GuaranteedOrder: false, @@ -55,10 +57,10 @@ func TestPublishSubscribe_not_persistent(t *testing.T) { msgs, err := pubSub.Subscribe(context.Background(), topicName) require.NoError(t, err) - sendMessages := infrastructure.PublishSimpleMessages(t, messagesCount, pubSub, topicName) + sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second) - infrastructure.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) + tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) assert.NoError(t, pubSub.Close()) } @@ -182,6 +184,6 @@ func testPublishSubscribeSubRace(t *testing.T) { log.Println("asserting") for subMsgs := range subscriberReceivedCh { - infrastructure.AssertAllMessagesReceived(t, sentMessages, subMsgs) + tests.AssertAllMessagesReceived(t, sentMessages, subMsgs) } } diff --git a/message/infrastructure/sync/waitgroup.go b/pubsub/sync/waitgroup.go similarity index 100% rename from message/infrastructure/sync/waitgroup.go rename to pubsub/sync/waitgroup.go diff --git a/message/infrastructure/sync/waitgroup_test.go b/pubsub/sync/waitgroup_test.go similarity index 100% rename from message/infrastructure/sync/waitgroup_test.go rename to pubsub/sync/waitgroup_test.go diff --git a/message/infrastructure/bench_pubsub.go b/pubsub/tests/bench_pubsub.go similarity index 97% rename from message/infrastructure/bench_pubsub.go rename to pubsub/tests/bench_pubsub.go index 1d99944f8..2090d0b27 100644 --- a/message/infrastructure/bench_pubsub.go +++ b/pubsub/tests/bench_pubsub.go @@ -1,4 +1,4 @@ -package infrastructure +package tests import ( "context" diff --git a/message/infrastructure/test_asserts.go b/pubsub/tests/test_asserts.go similarity index 98% rename from message/infrastructure/test_asserts.go rename to pubsub/tests/test_asserts.go index 0c4259297..5535ce56d 100644 --- a/message/infrastructure/test_asserts.go +++ b/pubsub/tests/test_asserts.go @@ -1,4 +1,4 @@ -package infrastructure +package tests import ( "sort" diff --git a/message/infrastructure/test_pubsub.go b/pubsub/tests/test_pubsub.go similarity index 99% rename from message/infrastructure/test_pubsub.go rename to pubsub/tests/test_pubsub.go index ace7df1d3..3404a0226 100644 --- a/message/infrastructure/test_pubsub.go +++ b/pubsub/tests/test_pubsub.go @@ -1,4 +1,4 @@ -package infrastructure +package tests import ( "context" diff --git a/message/infrastructure/test_pubsub_stress.go b/pubsub/tests/test_pubsub_stress.go similarity index 80% rename from message/infrastructure/test_pubsub_stress.go rename to pubsub/tests/test_pubsub_stress.go index 3c46b469e..1fdc62aeb 100644 --- a/message/infrastructure/test_pubsub_stress.go +++ b/pubsub/tests/test_pubsub_stress.go @@ -1,6 +1,6 @@ // +build stress -package infrastructure +package tests func init() { // stress tests may work a bit slower